summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2018-10-03 19:38:12 +0300
committerGitHub <noreply@github.com>2018-10-03 19:38:12 +0300
commit631f101ce1f8a8c368010437f44c8c9930b470ae (patch)
treef4a6723b9e8459536acb4624008ece8cdf2a12b6 /src
parentda4a6c0d31f8abd812b7f47384f32c816d31ceb1 (diff)
parent08075cf6c7a86eda49b18b94d00d81ad05fc6273 (diff)
downloadrabbitmq-server-git-631f101ce1f8a8c368010437f44c8c9930b470ae.tar.gz
Merge pull request #1719 from rabbitmq/rejects-confirms-interdependency
Take reject into account when sending confirms and vice-versa.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl86
1 files changed, 48 insertions, 38 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f270f2cc66..3a90563079 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1579,9 +1579,9 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC,
State1 = State#ch{mandatory = Mand1},
case rabbit_misc:is_abnormal_exit(Reason) of
true -> {MXs, UC1} = dtree:take_all(QPid, UC),
- send_nacks(MXs, State1#ch{unconfirmed = UC1});
+ record_rejects(MXs, State1#ch{unconfirmed = UC1});
false -> {MXs, UC1} = dtree:take(QPid, UC),
- record_confirms(MXs, State1#ch{unconfirmed = UC1})
+ record_confirms(MXs, State1#ch{unconfirmed = UC1})
end.
@@ -1911,37 +1911,29 @@ process_routing_confirm(true, QPids, MsgSeqNo, XName, State) ->
State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName,
State#ch.unconfirmed)}.
-send_nacks([], State) ->
- State;
-send_nacks(_MXs, State = #ch{state = closing,
- tx = none}) -> %% optimisation
- State;
-send_nacks(MXs, State = #ch{tx = none}) ->
- coalesce_and_send([MsgSeqNo || {MsgSeqNo, _} <- MXs],
- fun(MsgSeqNo, Multiple) ->
- #'basic.nack'{delivery_tag = MsgSeqNo,
- multiple = Multiple}
- end, State);
-send_nacks(_MXs, State = #ch{state = closing}) -> %% optimisation
- State#ch{tx = failed};
-send_nacks(_, State) ->
- maybe_complete_tx(State#ch{tx = failed}).
-
send_confirms_and_nacks(State = #ch{tx = none, confirmed = [], rejected = []}) ->
State;
send_confirms_and_nacks(State = #ch{tx = none, confirmed = C, rejected = R}) ->
case rabbit_node_monitor:pause_partition_guard() of
- ok -> ConfirmMsgSeqNos =
- lists:foldl(
- fun ({MsgSeqNo, XName}, MSNs) ->
- ?INCR_STATS(exchange_stats, XName, 1,
- confirm, State),
- [MsgSeqNo | MSNs]
- end, [], lists:append(C)),
- State1 = send_confirms(ConfirmMsgSeqNos, State#ch{confirmed = []}),
- %% TODO: msg seq nos, same as for confirms. Need to implement
- %% nack rates first.
- send_nacks(lists:append(R), State1#ch{rejected = []});
+ ok ->
+ Confirms = lists:append(C),
+ Rejects = lists:append(R),
+ ConfirmMsgSeqNos =
+ lists:foldl(
+ fun ({MsgSeqNo, XName}, MSNs) ->
+ ?INCR_STATS(exchange_stats, XName, 1, confirm, State),
+ [MsgSeqNo | MSNs]
+ end, [], Confirms),
+ RejectMsgSeqNos = [MsgSeqNo || {MsgSeqNo, _} <- Rejects],
+
+ State1 = send_confirms(ConfirmMsgSeqNos,
+ RejectMsgSeqNos,
+ State#ch{confirmed = []}),
+ %% TODO: msg seq nos, same as for confirms. Need to implement
+ %% nack rates first.
+ send_nacks(RejectMsgSeqNos,
+ ConfirmMsgSeqNos,
+ State1#ch{rejected = []});
pausing -> State
end;
send_confirms_and_nacks(State) ->
@@ -1950,26 +1942,44 @@ send_confirms_and_nacks(State) ->
pausing -> State
end.
-send_confirms([], State) ->
+send_nacks([], _, State) ->
State;
-send_confirms(_Cs, State = #ch{state = closing}) -> %% optimisation
+send_nacks(_Rs, _, State = #ch{state = closing,
+ tx = none}) -> %% optimisation
State;
-send_confirms([MsgSeqNo], State) ->
+send_nacks(Rs, Cs, State = #ch{tx = none}) ->
+ coalesce_and_send(Rs, Cs,
+ fun(MsgSeqNo, Multiple) ->
+ #'basic.nack'{delivery_tag = MsgSeqNo,
+ multiple = Multiple}
+ end, State);
+send_nacks(_MXs, _, State = #ch{state = closing}) -> %% optimisation
+ State#ch{tx = failed};
+send_nacks(_, _, State) ->
+ maybe_complete_tx(State#ch{tx = failed}).
+
+send_confirms([], _, State) ->
+ State;
+send_confirms(_Cs, _, State = #ch{state = closing}) -> %% optimisation
+ State;
+send_confirms([MsgSeqNo], _, State) ->
ok = send(#'basic.ack'{delivery_tag = MsgSeqNo}, State),
State;
-send_confirms(Cs, State) ->
- coalesce_and_send(Cs, fun(MsgSeqNo, Multiple) ->
+send_confirms(Cs, Rs, State) ->
+ coalesce_and_send(Cs, Rs,
+ fun(MsgSeqNo, Multiple) ->
#'basic.ack'{delivery_tag = MsgSeqNo,
multiple = Multiple}
- end, State).
+ end, State).
-coalesce_and_send(MsgSeqNos, MkMsgFun, State = #ch{unconfirmed = UC}) ->
+coalesce_and_send(MsgSeqNos, NegativeMsgSeqNos, MkMsgFun, State = #ch{unconfirmed = UC}) ->
SMsgSeqNos = lists:usort(MsgSeqNos),
- CutOff = case dtree:is_empty(UC) of
+ UnconfirmedCutoff = case dtree:is_empty(UC) of
true -> lists:last(SMsgSeqNos) + 1;
false -> {SeqNo, _XName} = dtree:smallest(UC), SeqNo
end,
- {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos),
+ Cutoff = lists:min([UnconfirmedCutoff | NegativeMsgSeqNos]),
+ {Ms, Ss} = lists:splitwith(fun(X) -> X < Cutoff end, SMsgSeqNos),
case Ms of
[] -> ok;
_ -> ok = send(MkMsgFun(lists:last(Ms), true), State)