diff options
| author | kjnilsson <knilsson@pivotal.io> | 2020-10-01 14:22:00 +0100 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2020-10-01 14:22:00 +0100 |
| commit | c5b933c07e2390b372171b8e2ac574b866ea7611 (patch) | |
| tree | 4bd4630fc7859d7a4ed52da009d3b9af82de3de9 /src | |
| parent | 5cd95d69d8f28fc08fa919c0b9f62689f2594f93 (diff) | |
| download | rabbitmq-server-git-c5b933c07e2390b372171b8e2ac574b866ea7611.tar.gz | |
Publish confirm fixes
Some subtle behavour around classic queue confirms got misunderstood
in the refactoring.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_classic_queue.erl | 63 |
1 files changed, 41 insertions, 22 deletions
diff --git a/src/rabbit_classic_queue.erl b/src/rabbit_classic_queue.erl index 7d94c05332..5eea2ba5d3 100644 --- a/src/rabbit_classic_queue.erl +++ b/src/rabbit_classic_queue.erl @@ -4,9 +4,13 @@ -include("amqqueue.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). +-record(msg_status, {pending :: [pid()], + confirmed = [] :: [pid()]}). + -record(?MODULE, {pid :: undefined | pid(), %% the current master pid qref :: term(), %% TODO - unconfirmed = #{} :: #{non_neg_integer() => [pid()]}}). + unconfirmed = #{} :: + #{non_neg_integer() => #msg_status{}}}). -define(STATE, ?MODULE). -opaque state() :: #?STATE{}. @@ -216,7 +220,9 @@ credit(CTag, Credit, Drain, State) -> handle_event({confirm, MsgSeqNos, Pid}, #?STATE{qref = QRef, unconfirmed = U0} = State) -> - {Unconfirmed, ConfirmedSeqNos} = confirm_seq_nos(MsgSeqNos, Pid, U0), + %% confirms should never result in rejections + {Unconfirmed, ConfirmedSeqNos, []} = + settle_seq_nos(MsgSeqNos, Pid, U0, confirm), Actions = [{settled, QRef, ConfirmedSeqNos}], %% handle confirm event from queues %% in this case the classic queue should track each individual publish and @@ -241,22 +247,22 @@ handle_event({down, Pid, Info}, #?STATE{qref = QRef, eol; false -> %% this assumes the mirror isn't part of the active set - %% so we can confirm this particular pid MsgSeqNos = maps:keys( - maps:filter(fun (_, Pids) -> + maps:filter(fun (_, #msg_status{pending = Pids}) -> lists:member(Pid, Pids) end, U0)), - %% if the exit is normal, treat it as a "confirm" - {Unconfirmed, ConfirmedSeqNos} = confirm_seq_nos(MsgSeqNos, Pid, U0), - Actions = [{settled, QRef, ConfirmedSeqNos}], + {Unconfirmed, ConfirmedSeqNos, Rejected} = + settle_seq_nos(MsgSeqNos, Pid, U0, down), + Actions = [{settled, QRef, ConfirmedSeqNos}, + {rejected, QRef, Rejected}], {ok, State0#?STATE{unconfirmed = Unconfirmed}, Actions}; true -> %% any abnormal exit should be considered a full reject of the %% oustanding message ids - If the message didn't get to all %% mirrors we have to assume it will never get there MsgIds = maps:fold( - fun (SeqNo, Pids, Acc) -> - case lists:member(Pid, Pids) of + fun (SeqNo, Status, Acc) -> + case lists:member(Pid, Status#msg_status.pending) of true -> [SeqNo | Acc]; false -> @@ -344,7 +350,7 @@ qpids(Qs, MsgNo) -> S = case S0 of #?STATE{unconfirmed = U0} -> Rec = [QPid | SPids], - U = U0#{MsgNo => Rec}, + U = U0#{MsgNo => #msg_status{pending = Rec}}, S0#?STATE{unconfirmed = U}; stateless -> S0 @@ -433,20 +439,33 @@ reject_seq_no(SeqNo, U0, Acc) -> {U0, Acc} end. -confirm_seq_nos(MsgSeqNos, Pid, U0) -> +settle_seq_nos(MsgSeqNos, Pid, U0, Reason) -> lists:foldl( - fun (SeqNo, {U, A0}) -> + fun (SeqNo, {U, C0, R0}) -> case U of - #{SeqNo := Pids0} -> - case lists:delete(Pid, Pids0) of - [] -> - %% the updated unconfirmed state - %% and the seqnos to settle - {maps:remove(SeqNo, U), [SeqNo | A0]}; - Pids -> - {U#{SeqNo => Pids}, A0} + #{SeqNo := Status0} -> + case update_msg_status(Reason, Pid, Status0) of + #msg_status{pending = [], + confirmed = []} -> + %% no pending left and nothing confirmed + %% then we reject it + {maps:remove(SeqNo, U), C0, [SeqNo | R0]}; + #msg_status{pending = [], + confirmed = _} -> + %% this can be confirmed as there are no pending + %% and confirmed isn't empty + {maps:remove(SeqNo, U), [SeqNo | C0], R0}; + MsgStatus -> + {U#{SeqNo => MsgStatus}, C0, R0} end; _ -> - {U, A0} + {U, C0, R0} end - end, {U0, []}, MsgSeqNos). + end, {U0, [], []}, MsgSeqNos). + +update_msg_status(confirm, Pid, #msg_status{pending = P, + confirmed = C} = S) -> + Rem = lists:delete(Pid, P), + S#msg_status{pending = Rem, confirmed = [Pid | C]}; +update_msg_status(down, Pid, #msg_status{pending = P} = S) -> + S#msg_status{pending = lists:delete(Pid, P)}. |
