summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-02-22 12:28:26 +0000
committerkjnilsson <knilsson@pivotal.io>2019-02-22 14:14:31 +0000
commit723972b3cbdf63e836c26150540b868833f30df8 (patch)
treebe06a96378691b700625412fbb462966fced795e /src
parente66284166322fce2a22218ecd5941d15f589a330 (diff)
downloadrabbitmq-server-git-723972b3cbdf63e836c26150540b868833f30df8.tar.gz
Fix rabbit_fifo poison handling
By ensuring the delivery count is retained when "dehydrating" the state in preparation for snapshotting. Now the entire message header map is stored which will take additional space w.r.t to keynamne duplication although this can be optimised. Also updated the property test to generate fake pids for multiple nodes so that multi node scenarios are better covered. [#163513253]
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl111
1 files changed, 67 insertions, 44 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 4ed2bb743a..39dbd8f3f1 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -81,7 +81,8 @@
%% in enqueue messages. Used to ensure ordering of messages send from the
%% same process
--type msg_header() :: #{delivery_count => non_neg_integer()}.
+-type msg_header() :: #{size := msg_size(),
+ delivery_count => non_neg_integer()}.
%% The message header map:
%% delivery_count: the number of unsuccessful delivery attempts.
%% A non-zero value indicates a previous attempt.
@@ -94,7 +95,7 @@
-type indexed_msg() :: {ra_index(), msg()}.
--type prefix_msg() :: {'$prefix_msg', msg_size()}.
+-type prefix_msg() :: {'$prefix_msg', msg_header()}.
-type delivery_msg() :: {msg_id(), msg()}.
%% A tuple consisting of the message id and the headered message.
@@ -242,8 +243,8 @@
%% overflow calculations).
%% This is done so that consumers are still served in a deterministic
%% order on recovery.
- prefix_msgs = {[], []} :: {Return :: [msg_size()],
- PrefixMsgs :: [msg_size()]},
+ prefix_msgs = {[], []} :: {Return :: [msg_header()],
+ PrefixMsgs :: [msg_header()]},
msg_bytes_enqueue = 0 :: non_neg_integer(),
msg_bytes_checkout = 0 :: non_neg_integer(),
max_length :: maybe(non_neg_integer()),
@@ -970,11 +971,9 @@ maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1
end.
apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
- Bytes = message_size(RawMsg),
case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of
{ok, State1, Effects1} ->
- State2 = append_to_master_index(RaftIdx,
- add_bytes_enqueue(Bytes, State1)),
+ State2 = append_to_master_index(RaftIdx, State1),
{State, ok, Effects} = checkout(Meta, State2, Effects1),
{maybe_store_dehydrated_state(RaftIdx, State), ok, Effects};
{duplicate, State, Effects} ->
@@ -991,7 +990,7 @@ drop_head(#state{ra_indexes = Indexes0} = State0, Effects0) ->
Effects = dead_letter_effects(maxlen, maps:put(none, FullMsg, #{}),
State, Effects0),
{State, Effects};
- {{'$prefix_msg', Bytes}, State1} ->
+ {{'$prefix_msg', #{size := Bytes}}, State1} ->
State = add_bytes_drop(Bytes, State1),
{State, Effects0};
empty ->
@@ -1001,12 +1000,14 @@ drop_head(#state{ra_indexes = Indexes0} = State0, Effects0) ->
enqueue(RaftIdx, RawMsg, #state{messages = Messages,
low_msg_num = LowMsgNum,
next_msg_num = NextMsgNum} = State0) ->
- Msg = {RaftIdx, {#{}, RawMsg}}, % indexed message with header map
- State0#state{messages = Messages#{NextMsgNum => Msg},
- % this is probably only done to record it when low_msg_num
- % is undefined
- low_msg_num = min(LowMsgNum, NextMsgNum),
- next_msg_num = NextMsgNum + 1}.
+ Size = message_size(RawMsg),
+ Msg = {RaftIdx, {#{size => Size}, RawMsg}}, % indexed message with header map
+ State = add_bytes_enqueue(Size, State0),
+ State#state{messages = Messages#{NextMsgNum => Msg},
+ % this is probably only done to record it when low_msg_num
+ % is undefined
+ low_msg_num = min(LowMsgNum, NextMsgNum),
+ next_msg_num = NextMsgNum + 1}.
append_to_master_index(RaftIdx,
#state{ra_indexes = Indexes0} = State0) ->
@@ -1088,11 +1089,14 @@ return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked,
credit = increase_credit(Con0, length(MsgNumMsgs))},
{Cons, SQ, Effects1} = update_or_remove_sub(ConsumerId, Con, Cons0,
SQ0, Effects0),
- {State1, Effects2} = lists:foldl(fun({'$prefix_msg', _} = Msg, {S0, E0}) ->
- return_one(0, Msg, S0, E0, ConsumerId, Con);
- ({MsgNum, Msg}, {S0, E0}) ->
- return_one(MsgNum, Msg, S0, E0, ConsumerId, Con)
- end, {State0, Effects1}, MsgNumMsgs),
+ {State1, Effects2} = lists:foldl(
+ fun({'$prefix_msg', _} = Msg, {S0, E0}) ->
+ return_one(0, Msg, S0, E0,
+ ConsumerId, Con);
+ ({MsgNum, Msg}, {S0, E0}) ->
+ return_one(MsgNum, Msg, S0, E0,
+ ConsumerId, Con)
+ end, {State0, Effects1}, MsgNumMsgs),
checkout(Meta, State1#state{consumers = Cons,
service_queue = SQ},
Effects2).
@@ -1152,9 +1156,8 @@ dead_letter_effects(_Reason, _Discarded,
Effects;
dead_letter_effects(Reason, Discarded,
#state{dead_letter_handler = {Mod, Fun, Args}}, Effects) ->
- DeadLetters = maps:fold(fun(_, {_, {_, {_Header, Msg}}},
- % MsgId, MsgIdID, RaftId, Header
- Acc) -> [{Reason, Msg} | Acc]
+ DeadLetters = maps:fold(fun(_, {_, {_, {_Header, Msg}}}, Acc) ->
+ [{Reason, Msg} | Acc]
end, [], Discarded),
[{mod_call, Mod, Fun, Args ++ [DeadLetters]} | Effects].
@@ -1200,24 +1203,44 @@ find_next_cursor(Smallest, Cursors0, Potential) ->
{Potential, Cursors0}
end.
-return_one(0, {'$prefix_msg', _} = Msg,
- #state{returns = Returns} = State0, Effects, _ConsumerId, _Con) ->
- {add_bytes_return(Msg,
- State0#state{returns = lqueue:in(Msg, Returns)}), Effects};
+return_one(0, {'$prefix_msg', Header0},
+ #state{returns = Returns,
+ delivery_limit = DeliveryLimit} = State0, Effects0,
+ ConsumerId, Con) ->
+ Header = maps:update_with(delivery_count,
+ fun (C) -> C+1 end,
+ 1, Header0),
+ Msg = {'$prefix_msg', Header},
+ case maps:get(delivery_count, Header) of
+ DeliveryCount when DeliveryCount > DeliveryLimit ->
+ Checked = Con#consumer.checked_out,
+ {State1, Effects} = complete(ConsumerId, [], 1, Con, Checked,
+ Effects0, State0),
+ {add_bytes_settle(Msg, State1), Effects};
+ _ ->
+ %% this should not affect the release cursor in any way
+ {add_bytes_return(Msg,
+ State0#state{returns = lqueue:in(Msg, Returns)}),
+ Effects0}
+ end;
return_one(MsgNum, {RaftId, {Header0, RawMsg}},
#state{returns = Returns,
- delivery_limit = DeliveryLimit} = State0, Effects0, ConsumerId, Con) ->
+ delivery_limit = DeliveryLimit} = State0,
+ Effects0, ConsumerId, Con) ->
Header = maps:update_with(delivery_count,
fun (C) -> C+1 end,
1, Header0),
+ Msg = {RaftId, {Header, RawMsg}},
case maps:get(delivery_count, Header) of
DeliveryCount when DeliveryCount > DeliveryLimit ->
- Effects = dead_letter_effects(rejected, maps:put(none, {MsgNum, {RaftId, {Header, RawMsg}}}, #{}), State0, Effects0),
- Checked = maps:without([MsgNum], Con#consumer.checked_out),
- {State1, Effects1} = complete(ConsumerId, [RaftId], 1, Con, Checked, Effects, State0),
+ DlMsg = {MsgNum, Msg},
+ Effects = dead_letter_effects(rejected, maps:put(none, DlMsg, #{}),
+ State0, Effects0),
+ Checked = Con#consumer.checked_out,
+ {State1, Effects1} = complete(ConsumerId, [RaftId], 1, Con, Checked,
+ Effects, State0),
{add_bytes_settle(RawMsg, State1), Effects1};
_ ->
- Msg = {RaftId, {Header, RawMsg}},
%% this should not affect the release cursor in any way
{add_bytes_return(RawMsg,
State0#state{returns = lqueue:in({MsgNum, Msg}, Returns)}), Effects0}
@@ -1293,9 +1316,9 @@ append_send_msg_effects(Effects0, AccMap) ->
%%
%% When we return it is always done to the current return queue
%% for both prefix messages and current messages
-take_next_msg(#state{prefix_msgs = {[Bytes | Rem], P}} = State) ->
+take_next_msg(#state{prefix_msgs = {[Header | Rem], P}} = State) ->
%% there are prefix returns, these should be served first
- {{'$prefix_msg', Bytes},
+ {{'$prefix_msg', Header},
State#state{prefix_msgs = {Rem, P}}};
take_next_msg(#state{returns = Returns,
low_msg_num = Low0,
@@ -1325,9 +1348,9 @@ take_next_msg(#state{returns = Returns,
end
end;
empty ->
- [Bytes | Rem] = P,
+ [Header | Rem] = P,
%% There are prefix msgs
- {{'$prefix_msg', Bytes},
+ {{'$prefix_msg', Header},
State#state{prefix_msgs = {R, Rem}}}
end.
@@ -1486,15 +1509,15 @@ dehydrate_state(#state{messages = Messages,
returns = Returns,
prefix_msgs = {PrefRet0, PrefMsg0}} = State) ->
%% TODO: optimise this function as far as possible
- PrefRet = lists:foldl(fun ({'$prefix_msg', Bytes}, Acc) ->
- [Bytes | Acc];
- ({_, {_, {_, Raw}}}, Acc) ->
- [message_size(Raw) | Acc]
+ PrefRet = lists:foldl(fun ({'$prefix_msg', Header}, Acc) ->
+ [Header | Acc];
+ ({_, {_, {Header, _}}}, Acc) ->
+ [Header | Acc]
end,
lists:reverse(PrefRet0),
lqueue:to_list(Returns)),
- PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {_H, Raw}}}, Acc) ->
- [message_size(Raw) | Acc]
+ PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {Header, _}}}, Acc) ->
+ [Header| Acc]
end,
lists:reverse(PrefMsg0),
lists:sort(maps:to_list(Messages))),
@@ -1512,8 +1535,8 @@ dehydrate_state(#state{messages = Messages,
dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
Checked = maps:map(fun (_, {'$prefix_msg', _} = M) ->
M;
- (_, {_, {_, {_, Raw}}}) ->
- {'$prefix_msg', message_size(Raw)}
+ (_, {_, {_, {Header, _}}}) ->
+ {'$prefix_msg', Header}
end, Checked0),
Con#consumer{checked_out = Checked}.
@@ -1591,7 +1614,7 @@ add_bytes_return(Msg, #state{msg_bytes_checkout = Checkout,
message_size(#basic_message{content = Content}) ->
#content{payload_fragments_rev = PFR} = Content,
iolist_size(PFR);
-message_size({'$prefix_msg', B}) ->
+message_size({'$prefix_msg', #{size := B}}) ->
B;
message_size(B) when is_binary(B) ->
byte_size(B);