diff options
| author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-02-28 15:33:10 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-02-28 15:33:10 +0100 |
| commit | 7aa77b2f0b832fd1fd4fa08f8e0ef86a7009e1fe (patch) | |
| tree | f3035422a09194a91ba49512fed85c15463fae66 /src | |
| parent | eb061384fe508a8acaa39b75d32afd7c5a8ecee2 (diff) | |
| parent | 877c1df3098af717f043e2fc8db32aae05257873 (diff) | |
| download | rabbitmq-server-git-7aa77b2f0b832fd1fd4fa08f8e0ef86a7009e1fe.tar.gz | |
Merge pull request #2262 from rabbitmq/qq-dead-letter-bug
Fix QQ dead letter crash
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 44 |
1 files changed, 31 insertions, 13 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 72109cfbf4..d5fab879c2 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -1162,12 +1162,26 @@ dead_letter_effects(_Reason, _Discarded, dead_letter_effects(Reason, Discarded, #?MODULE{cfg = #cfg{dead_letter_handler = {Mod, Fun, Args}}}, Effects) -> - DeadLetters = maps:fold(fun(_, {_, {_, {_Header, Msg}}}, Acc) -> - [{Reason, Msg} | Acc]; - (_, _, Acc) -> - Acc - end, [], Discarded), - [{mod_call, Mod, Fun, Args ++ [DeadLetters]} | Effects]. + RaftIdxs = maps:fold( + fun (_, {_, {RaftIdx, {_Header, 'empty'}}}, Acc) -> + [RaftIdx | Acc]; + (_, _, Acc) -> + Acc + end, [], Discarded), + [{log, RaftIdxs, + fun (Log) -> + Lookup = maps:from_list(lists:zip(RaftIdxs, Log)), + DeadLetters = maps:fold( + fun (_, {_, {RaftIdx, {_Header, 'empty'}}}, Acc) -> + {enqueue, _, _, Msg} = maps:get(RaftIdx, Lookup), + [{Reason, Msg} | Acc]; + (_, {_, {_, {_Header, Msg}}}, Acc) -> + [{Reason, Msg} | Acc]; + (_, _, Acc) -> + Acc + end, [], Discarded), + [{mod_call, Mod, Fun, Args ++ [DeadLetters]}] + end} | Effects]. cancel_consumer_effects(ConsumerId, #?MODULE{cfg = #cfg{resource = QName}}, Effects) -> @@ -1235,7 +1249,8 @@ return_one(MsgId, 0, {Tag, Header0}, %% this should not affect the release cursor in any way Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)}, {Msg, State1} = case Tag of - '$empty_msg' -> {Msg0, State0}; + '$empty_msg' -> + {Msg0, State0}; _ -> case evaluate_memory_limit(Header, State0) of true -> {{'$empty_msg', Header}, State0}; @@ -1310,14 +1325,15 @@ checkout(#{index := Index}, State0, Effects0) -> {State, ok, Effects} end. -checkout0({success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State}, Effects, - {SendAcc, LogAcc0}) -> +checkout0({success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State}, + Effects, {SendAcc, LogAcc0}) -> DelMsg = {RaftIdx, {MsgId, Header}}, LogAcc = maps:update_with(ConsumerId, fun (M) -> [DelMsg | M] end, [DelMsg], LogAcc0), checkout0(checkout_one(State), Effects, {SendAcc, LogAcc}); -checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, {SendAcc0, LogAcc}) -> +checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, + {SendAcc0, LogAcc}) -> DelMsg = {MsgId, Msg}, SendAcc = maps:update_with(ConsumerId, fun (M) -> [DelMsg | M] end, @@ -1326,10 +1342,12 @@ checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, {SendAcc0, LogAcc}) checkout0({Activity, State0}, Effects0, {SendAcc, LogAcc}) -> Effects1 = case Activity of nochange -> - append_send_msg_effects(append_log_effects(Effects0, LogAcc), SendAcc); + append_send_msg_effects( + append_log_effects(Effects0, LogAcc), SendAcc); inactive -> [{aux, inactive} - | append_send_msg_effects(append_log_effects(Effects0, LogAcc), SendAcc)] + | append_send_msg_effects( + append_log_effects(Effects0, LogAcc), SendAcc)] end, {State0, ok, lists:reverse(Effects1)}. @@ -1441,7 +1459,7 @@ send_log_effect({CTag, CPid}, IdxMsgs) -> {RaftIdxs, Data} = lists:unzip(IdxMsgs), {log, RaftIdxs, fun(Log) -> - Msgs = lists:zipwith(fun({enqueue, _, _, Msg}, {MsgId, Header}) -> + Msgs = lists:zipwith(fun ({enqueue, _, _, Msg}, {MsgId, Header}) -> {MsgId, {Header, Msg}} end, Log, Data), [{send_msg, CPid, {delivery, CTag, Msgs}, [local, ra_event]}] |
