diff options
| author | Daniil Fedotov <hairyhum@gmail.com> | 2018-10-01 17:38:07 +0100 |
|---|---|---|
| committer | Daniil Fedotov <hairyhum@gmail.com> | 2018-10-01 17:38:07 +0100 |
| commit | 8b37501451d2ef66dbf16b62b84684b74157dcbc (patch) | |
| tree | 21c197790255213e38b8fdfde6bff02fe7cfc9b4 | |
| parent | c1fb658ce13ae7f14919bd784eddd3dc77ab5144 (diff) | |
| download | rabbitmq-server-git-8b37501451d2ef66dbf16b62b84684b74157dcbc.tar.gz | |
Take reject into account when sending confirms and vice-versa.
Before bf531fd017cbec756ee979299723adce76828c96 rejects were not
collected like confirms and extracted from unconfirmed.
When adding the feature the important detail was missed:
if unconfirmed dtree is empty, confirms will be sent as multiple
confirming all messages up to latest.
If there are rejects recorded, the channel can send multiple confirm
and then reject right after with a lower message ID, which makes clients
fail.
Reported in php-amqplib/php-amqplib#597
| -rw-r--r-- | src/rabbit_channel.erl | 85 |
1 files changed, 48 insertions, 37 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f270f2cc66..a9ead06462 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1579,9 +1579,10 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC, State1 = State#ch{mandatory = Mand1}, case rabbit_misc:is_abnormal_exit(Reason) of true -> {MXs, UC1} = dtree:take_all(QPid, UC), - send_nacks(MXs, State1#ch{unconfirmed = UC1}); + record_rejects(MXs, State1#ch{unconfirmed = UC1}); + %% TODO send_confirms_and_nacks here? false -> {MXs, UC1} = dtree:take(QPid, UC), - record_confirms(MXs, State1#ch{unconfirmed = UC1}) + record_confirms(MXs, State1#ch{unconfirmed = UC1}) end. @@ -1911,37 +1912,29 @@ process_routing_confirm(true, QPids, MsgSeqNo, XName, State) -> State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName, State#ch.unconfirmed)}. -send_nacks([], State) -> - State; -send_nacks(_MXs, State = #ch{state = closing, - tx = none}) -> %% optimisation - State; -send_nacks(MXs, State = #ch{tx = none}) -> - coalesce_and_send([MsgSeqNo || {MsgSeqNo, _} <- MXs], - fun(MsgSeqNo, Multiple) -> - #'basic.nack'{delivery_tag = MsgSeqNo, - multiple = Multiple} - end, State); -send_nacks(_MXs, State = #ch{state = closing}) -> %% optimisation - State#ch{tx = failed}; -send_nacks(_, State) -> - maybe_complete_tx(State#ch{tx = failed}). - send_confirms_and_nacks(State = #ch{tx = none, confirmed = [], rejected = []}) -> State; send_confirms_and_nacks(State = #ch{tx = none, confirmed = C, rejected = R}) -> case rabbit_node_monitor:pause_partition_guard() of - ok -> ConfirmMsgSeqNos = - lists:foldl( - fun ({MsgSeqNo, XName}, MSNs) -> - ?INCR_STATS(exchange_stats, XName, 1, - confirm, State), - [MsgSeqNo | MSNs] - end, [], lists:append(C)), - State1 = send_confirms(ConfirmMsgSeqNos, State#ch{confirmed = []}), - %% TODO: msg seq nos, same as for confirms. Need to implement - %% nack rates first. - send_nacks(lists:append(R), State1#ch{rejected = []}); + ok -> + Confirms = lists:append(C), + Rejects = lists:append(R), + ConfirmMsgSeqNos = + lists:foldl( + fun ({MsgSeqNo, XName}, MSNs) -> + ?INCR_STATS(exchange_stats, XName, 1, confirm, State), + [MsgSeqNo | MSNs] + end, [], Confirms), + RejectMsgSeqNos = [MsgSeqNo || {MsgSeqNo, _} <- Rejects], + + State1 = send_confirms(ConfirmMsgSeqNos, + RejectMsgSeqNos, + State#ch{confirmed = []}), + %% TODO: msg seq nos, same as for confirms. Need to implement + %% nack rates first. + send_nacks(RejectMsgSeqNos, + ConfirmMsgSeqNos, + State1#ch{rejected = []}); pausing -> State end; send_confirms_and_nacks(State) -> @@ -1950,25 +1943,43 @@ send_confirms_and_nacks(State) -> pausing -> State end. -send_confirms([], State) -> +send_nacks([], _, State) -> State; -send_confirms(_Cs, State = #ch{state = closing}) -> %% optimisation +send_nacks(_Rs, _, State = #ch{state = closing, + tx = none}) -> %% optimisation State; -send_confirms([MsgSeqNo], State) -> +send_nacks(Rs, Cs, State = #ch{tx = none}) -> + coalesce_and_send(Rs, Cs, + fun(MsgSeqNo, Multiple) -> + #'basic.nack'{delivery_tag = MsgSeqNo, + multiple = Multiple} + end, State); +send_nacks(_MXs, _, State = #ch{state = closing}) -> %% optimisation + State#ch{tx = failed}; +send_nacks(_, _, State) -> + maybe_complete_tx(State#ch{tx = failed}). + +send_confirms([], _, State) -> + State; +send_confirms(_Cs, _, State = #ch{state = closing}) -> %% optimisation + State; +send_confirms([MsgSeqNo], _, State) -> ok = send(#'basic.ack'{delivery_tag = MsgSeqNo}, State), State; -send_confirms(Cs, State) -> - coalesce_and_send(Cs, fun(MsgSeqNo, Multiple) -> +send_confirms(Cs, Rs, State) -> + coalesce_and_send(Cs, Rs, + fun(MsgSeqNo, Multiple) -> #'basic.ack'{delivery_tag = MsgSeqNo, multiple = Multiple} - end, State). + end, State). -coalesce_and_send(MsgSeqNos, MkMsgFun, State = #ch{unconfirmed = UC}) -> +coalesce_and_send(MsgSeqNos, NegativeMsgSeqNos, MkMsgFun, State = #ch{unconfirmed = UC}) -> SMsgSeqNos = lists:usort(MsgSeqNos), - CutOff = case dtree:is_empty(UC) of + UnconfirmedCutOff = case dtree:is_empty(UC) of true -> lists:last(SMsgSeqNos) + 1; false -> {SeqNo, _XName} = dtree:smallest(UC), SeqNo end, + CutOff = lists:min([UnconfirmedCutOff | NegativeMsgSeqNos]), {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos), case Ms of [] -> ok; |
