summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2020-10-01 14:22:00 +0100
committerkjnilsson <knilsson@pivotal.io>2020-10-01 14:22:00 +0100
commitc5b933c07e2390b372171b8e2ac574b866ea7611 (patch)
tree4bd4630fc7859d7a4ed52da009d3b9af82de3de9
parent5cd95d69d8f28fc08fa919c0b9f62689f2594f93 (diff)
downloadrabbitmq-server-git-c5b933c07e2390b372171b8e2ac574b866ea7611.tar.gz
Publish confirm fixes
Some subtle behavour around classic queue confirms got misunderstood in the refactoring.
-rw-r--r--src/rabbit_classic_queue.erl63
1 files changed, 41 insertions, 22 deletions
diff --git a/src/rabbit_classic_queue.erl b/src/rabbit_classic_queue.erl
index 7d94c05332..5eea2ba5d3 100644
--- a/src/rabbit_classic_queue.erl
+++ b/src/rabbit_classic_queue.erl
@@ -4,9 +4,13 @@
-include("amqqueue.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
+-record(msg_status, {pending :: [pid()],
+ confirmed = [] :: [pid()]}).
+
-record(?MODULE, {pid :: undefined | pid(), %% the current master pid
qref :: term(), %% TODO
- unconfirmed = #{} :: #{non_neg_integer() => [pid()]}}).
+ unconfirmed = #{} ::
+ #{non_neg_integer() => #msg_status{}}}).
-define(STATE, ?MODULE).
-opaque state() :: #?STATE{}.
@@ -216,7 +220,9 @@ credit(CTag, Credit, Drain, State) ->
handle_event({confirm, MsgSeqNos, Pid}, #?STATE{qref = QRef,
unconfirmed = U0} = State) ->
- {Unconfirmed, ConfirmedSeqNos} = confirm_seq_nos(MsgSeqNos, Pid, U0),
+ %% confirms should never result in rejections
+ {Unconfirmed, ConfirmedSeqNos, []} =
+ settle_seq_nos(MsgSeqNos, Pid, U0, confirm),
Actions = [{settled, QRef, ConfirmedSeqNos}],
%% handle confirm event from queues
%% in this case the classic queue should track each individual publish and
@@ -241,22 +247,22 @@ handle_event({down, Pid, Info}, #?STATE{qref = QRef,
eol;
false ->
%% this assumes the mirror isn't part of the active set
- %% so we can confirm this particular pid
MsgSeqNos = maps:keys(
- maps:filter(fun (_, Pids) ->
+ maps:filter(fun (_, #msg_status{pending = Pids}) ->
lists:member(Pid, Pids)
end, U0)),
- %% if the exit is normal, treat it as a "confirm"
- {Unconfirmed, ConfirmedSeqNos} = confirm_seq_nos(MsgSeqNos, Pid, U0),
- Actions = [{settled, QRef, ConfirmedSeqNos}],
+ {Unconfirmed, ConfirmedSeqNos, Rejected} =
+ settle_seq_nos(MsgSeqNos, Pid, U0, down),
+ Actions = [{settled, QRef, ConfirmedSeqNos},
+ {rejected, QRef, Rejected}],
{ok, State0#?STATE{unconfirmed = Unconfirmed}, Actions};
true ->
%% any abnormal exit should be considered a full reject of the
%% oustanding message ids - If the message didn't get to all
%% mirrors we have to assume it will never get there
MsgIds = maps:fold(
- fun (SeqNo, Pids, Acc) ->
- case lists:member(Pid, Pids) of
+ fun (SeqNo, Status, Acc) ->
+ case lists:member(Pid, Status#msg_status.pending) of
true ->
[SeqNo | Acc];
false ->
@@ -344,7 +350,7 @@ qpids(Qs, MsgNo) ->
S = case S0 of
#?STATE{unconfirmed = U0} ->
Rec = [QPid | SPids],
- U = U0#{MsgNo => Rec},
+ U = U0#{MsgNo => #msg_status{pending = Rec}},
S0#?STATE{unconfirmed = U};
stateless ->
S0
@@ -433,20 +439,33 @@ reject_seq_no(SeqNo, U0, Acc) ->
{U0, Acc}
end.
-confirm_seq_nos(MsgSeqNos, Pid, U0) ->
+settle_seq_nos(MsgSeqNos, Pid, U0, Reason) ->
lists:foldl(
- fun (SeqNo, {U, A0}) ->
+ fun (SeqNo, {U, C0, R0}) ->
case U of
- #{SeqNo := Pids0} ->
- case lists:delete(Pid, Pids0) of
- [] ->
- %% the updated unconfirmed state
- %% and the seqnos to settle
- {maps:remove(SeqNo, U), [SeqNo | A0]};
- Pids ->
- {U#{SeqNo => Pids}, A0}
+ #{SeqNo := Status0} ->
+ case update_msg_status(Reason, Pid, Status0) of
+ #msg_status{pending = [],
+ confirmed = []} ->
+ %% no pending left and nothing confirmed
+ %% then we reject it
+ {maps:remove(SeqNo, U), C0, [SeqNo | R0]};
+ #msg_status{pending = [],
+ confirmed = _} ->
+ %% this can be confirmed as there are no pending
+ %% and confirmed isn't empty
+ {maps:remove(SeqNo, U), [SeqNo | C0], R0};
+ MsgStatus ->
+ {U#{SeqNo => MsgStatus}, C0, R0}
end;
_ ->
- {U, A0}
+ {U, C0, R0}
end
- end, {U0, []}, MsgSeqNos).
+ end, {U0, [], []}, MsgSeqNos).
+
+update_msg_status(confirm, Pid, #msg_status{pending = P,
+ confirmed = C} = S) ->
+ Rem = lists:delete(Pid, P),
+ S#msg_status{pending = Rem, confirmed = [Pid | C]};
+update_msg_status(down, Pid, #msg_status{pending = P} = S) ->
+ S#msg_status{pending = lists:delete(Pid, P)}.