summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniil Fedotov <hairyhum@gmail.com>2018-10-01 17:38:07 +0100
committerDaniil Fedotov <hairyhum@gmail.com>2018-10-01 17:38:07 +0100
commit8b37501451d2ef66dbf16b62b84684b74157dcbc (patch)
tree21c197790255213e38b8fdfde6bff02fe7cfc9b4
parentc1fb658ce13ae7f14919bd784eddd3dc77ab5144 (diff)
downloadrabbitmq-server-git-8b37501451d2ef66dbf16b62b84684b74157dcbc.tar.gz
Take reject into account when sending confirms and vice-versa.
Before bf531fd017cbec756ee979299723adce76828c96 rejects were not collected like confirms and extracted from unconfirmed. When adding the feature the important detail was missed: if unconfirmed dtree is empty, confirms will be sent as multiple confirming all messages up to latest. If there are rejects recorded, the channel can send multiple confirm and then reject right after with a lower message ID, which makes clients fail. Reported in php-amqplib/php-amqplib#597
-rw-r--r--src/rabbit_channel.erl85
1 files changed, 48 insertions, 37 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f270f2cc66..a9ead06462 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1579,9 +1579,10 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC,
State1 = State#ch{mandatory = Mand1},
case rabbit_misc:is_abnormal_exit(Reason) of
true -> {MXs, UC1} = dtree:take_all(QPid, UC),
- send_nacks(MXs, State1#ch{unconfirmed = UC1});
+ record_rejects(MXs, State1#ch{unconfirmed = UC1});
+ %% TODO send_confirms_and_nacks here?
false -> {MXs, UC1} = dtree:take(QPid, UC),
- record_confirms(MXs, State1#ch{unconfirmed = UC1})
+ record_confirms(MXs, State1#ch{unconfirmed = UC1})
end.
@@ -1911,37 +1912,29 @@ process_routing_confirm(true, QPids, MsgSeqNo, XName, State) ->
State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName,
State#ch.unconfirmed)}.
-send_nacks([], State) ->
- State;
-send_nacks(_MXs, State = #ch{state = closing,
- tx = none}) -> %% optimisation
- State;
-send_nacks(MXs, State = #ch{tx = none}) ->
- coalesce_and_send([MsgSeqNo || {MsgSeqNo, _} <- MXs],
- fun(MsgSeqNo, Multiple) ->
- #'basic.nack'{delivery_tag = MsgSeqNo,
- multiple = Multiple}
- end, State);
-send_nacks(_MXs, State = #ch{state = closing}) -> %% optimisation
- State#ch{tx = failed};
-send_nacks(_, State) ->
- maybe_complete_tx(State#ch{tx = failed}).
-
send_confirms_and_nacks(State = #ch{tx = none, confirmed = [], rejected = []}) ->
State;
send_confirms_and_nacks(State = #ch{tx = none, confirmed = C, rejected = R}) ->
case rabbit_node_monitor:pause_partition_guard() of
- ok -> ConfirmMsgSeqNos =
- lists:foldl(
- fun ({MsgSeqNo, XName}, MSNs) ->
- ?INCR_STATS(exchange_stats, XName, 1,
- confirm, State),
- [MsgSeqNo | MSNs]
- end, [], lists:append(C)),
- State1 = send_confirms(ConfirmMsgSeqNos, State#ch{confirmed = []}),
- %% TODO: msg seq nos, same as for confirms. Need to implement
- %% nack rates first.
- send_nacks(lists:append(R), State1#ch{rejected = []});
+ ok ->
+ Confirms = lists:append(C),
+ Rejects = lists:append(R),
+ ConfirmMsgSeqNos =
+ lists:foldl(
+ fun ({MsgSeqNo, XName}, MSNs) ->
+ ?INCR_STATS(exchange_stats, XName, 1, confirm, State),
+ [MsgSeqNo | MSNs]
+ end, [], Confirms),
+ RejectMsgSeqNos = [MsgSeqNo || {MsgSeqNo, _} <- Rejects],
+
+ State1 = send_confirms(ConfirmMsgSeqNos,
+ RejectMsgSeqNos,
+ State#ch{confirmed = []}),
+ %% TODO: msg seq nos, same as for confirms. Need to implement
+ %% nack rates first.
+ send_nacks(RejectMsgSeqNos,
+ ConfirmMsgSeqNos,
+ State1#ch{rejected = []});
pausing -> State
end;
send_confirms_and_nacks(State) ->
@@ -1950,25 +1943,43 @@ send_confirms_and_nacks(State) ->
pausing -> State
end.
-send_confirms([], State) ->
+send_nacks([], _, State) ->
State;
-send_confirms(_Cs, State = #ch{state = closing}) -> %% optimisation
+send_nacks(_Rs, _, State = #ch{state = closing,
+ tx = none}) -> %% optimisation
State;
-send_confirms([MsgSeqNo], State) ->
+send_nacks(Rs, Cs, State = #ch{tx = none}) ->
+ coalesce_and_send(Rs, Cs,
+ fun(MsgSeqNo, Multiple) ->
+ #'basic.nack'{delivery_tag = MsgSeqNo,
+ multiple = Multiple}
+ end, State);
+send_nacks(_MXs, _, State = #ch{state = closing}) -> %% optimisation
+ State#ch{tx = failed};
+send_nacks(_, _, State) ->
+ maybe_complete_tx(State#ch{tx = failed}).
+
+send_confirms([], _, State) ->
+ State;
+send_confirms(_Cs, _, State = #ch{state = closing}) -> %% optimisation
+ State;
+send_confirms([MsgSeqNo], _, State) ->
ok = send(#'basic.ack'{delivery_tag = MsgSeqNo}, State),
State;
-send_confirms(Cs, State) ->
- coalesce_and_send(Cs, fun(MsgSeqNo, Multiple) ->
+send_confirms(Cs, Rs, State) ->
+ coalesce_and_send(Cs, Rs,
+ fun(MsgSeqNo, Multiple) ->
#'basic.ack'{delivery_tag = MsgSeqNo,
multiple = Multiple}
- end, State).
+ end, State).
-coalesce_and_send(MsgSeqNos, MkMsgFun, State = #ch{unconfirmed = UC}) ->
+coalesce_and_send(MsgSeqNos, NegativeMsgSeqNos, MkMsgFun, State = #ch{unconfirmed = UC}) ->
SMsgSeqNos = lists:usort(MsgSeqNos),
- CutOff = case dtree:is_empty(UC) of
+ UnconfirmedCutOff = case dtree:is_empty(UC) of
true -> lists:last(SMsgSeqNos) + 1;
false -> {SeqNo, _XName} = dtree:smallest(UC), SeqNo
end,
+ CutOff = lists:min([UnconfirmedCutOff | NegativeMsgSeqNos]),
{Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos),
case Ms of
[] -> ok;