diff options
| author | Michael Klishin <michael@novemberain.com> | 2018-10-03 19:38:12 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2018-10-03 19:38:12 +0300 |
| commit | 631f101ce1f8a8c368010437f44c8c9930b470ae (patch) | |
| tree | f4a6723b9e8459536acb4624008ece8cdf2a12b6 /src | |
| parent | da4a6c0d31f8abd812b7f47384f32c816d31ceb1 (diff) | |
| parent | 08075cf6c7a86eda49b18b94d00d81ad05fc6273 (diff) | |
| download | rabbitmq-server-git-631f101ce1f8a8c368010437f44c8c9930b470ae.tar.gz | |
Merge pull request #1719 from rabbitmq/rejects-confirms-interdependency
Take reject into account when sending confirms and vice-versa.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 86 |
1 files changed, 48 insertions, 38 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f270f2cc66..3a90563079 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1579,9 +1579,9 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC, State1 = State#ch{mandatory = Mand1}, case rabbit_misc:is_abnormal_exit(Reason) of true -> {MXs, UC1} = dtree:take_all(QPid, UC), - send_nacks(MXs, State1#ch{unconfirmed = UC1}); + record_rejects(MXs, State1#ch{unconfirmed = UC1}); false -> {MXs, UC1} = dtree:take(QPid, UC), - record_confirms(MXs, State1#ch{unconfirmed = UC1}) + record_confirms(MXs, State1#ch{unconfirmed = UC1}) end. @@ -1911,37 +1911,29 @@ process_routing_confirm(true, QPids, MsgSeqNo, XName, State) -> State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName, State#ch.unconfirmed)}. -send_nacks([], State) -> - State; -send_nacks(_MXs, State = #ch{state = closing, - tx = none}) -> %% optimisation - State; -send_nacks(MXs, State = #ch{tx = none}) -> - coalesce_and_send([MsgSeqNo || {MsgSeqNo, _} <- MXs], - fun(MsgSeqNo, Multiple) -> - #'basic.nack'{delivery_tag = MsgSeqNo, - multiple = Multiple} - end, State); -send_nacks(_MXs, State = #ch{state = closing}) -> %% optimisation - State#ch{tx = failed}; -send_nacks(_, State) -> - maybe_complete_tx(State#ch{tx = failed}). - send_confirms_and_nacks(State = #ch{tx = none, confirmed = [], rejected = []}) -> State; send_confirms_and_nacks(State = #ch{tx = none, confirmed = C, rejected = R}) -> case rabbit_node_monitor:pause_partition_guard() of - ok -> ConfirmMsgSeqNos = - lists:foldl( - fun ({MsgSeqNo, XName}, MSNs) -> - ?INCR_STATS(exchange_stats, XName, 1, - confirm, State), - [MsgSeqNo | MSNs] - end, [], lists:append(C)), - State1 = send_confirms(ConfirmMsgSeqNos, State#ch{confirmed = []}), - %% TODO: msg seq nos, same as for confirms. Need to implement - %% nack rates first. - send_nacks(lists:append(R), State1#ch{rejected = []}); + ok -> + Confirms = lists:append(C), + Rejects = lists:append(R), + ConfirmMsgSeqNos = + lists:foldl( + fun ({MsgSeqNo, XName}, MSNs) -> + ?INCR_STATS(exchange_stats, XName, 1, confirm, State), + [MsgSeqNo | MSNs] + end, [], Confirms), + RejectMsgSeqNos = [MsgSeqNo || {MsgSeqNo, _} <- Rejects], + + State1 = send_confirms(ConfirmMsgSeqNos, + RejectMsgSeqNos, + State#ch{confirmed = []}), + %% TODO: msg seq nos, same as for confirms. Need to implement + %% nack rates first. + send_nacks(RejectMsgSeqNos, + ConfirmMsgSeqNos, + State1#ch{rejected = []}); pausing -> State end; send_confirms_and_nacks(State) -> @@ -1950,26 +1942,44 @@ send_confirms_and_nacks(State) -> pausing -> State end. -send_confirms([], State) -> +send_nacks([], _, State) -> State; -send_confirms(_Cs, State = #ch{state = closing}) -> %% optimisation +send_nacks(_Rs, _, State = #ch{state = closing, + tx = none}) -> %% optimisation State; -send_confirms([MsgSeqNo], State) -> +send_nacks(Rs, Cs, State = #ch{tx = none}) -> + coalesce_and_send(Rs, Cs, + fun(MsgSeqNo, Multiple) -> + #'basic.nack'{delivery_tag = MsgSeqNo, + multiple = Multiple} + end, State); +send_nacks(_MXs, _, State = #ch{state = closing}) -> %% optimisation + State#ch{tx = failed}; +send_nacks(_, _, State) -> + maybe_complete_tx(State#ch{tx = failed}). + +send_confirms([], _, State) -> + State; +send_confirms(_Cs, _, State = #ch{state = closing}) -> %% optimisation + State; +send_confirms([MsgSeqNo], _, State) -> ok = send(#'basic.ack'{delivery_tag = MsgSeqNo}, State), State; -send_confirms(Cs, State) -> - coalesce_and_send(Cs, fun(MsgSeqNo, Multiple) -> +send_confirms(Cs, Rs, State) -> + coalesce_and_send(Cs, Rs, + fun(MsgSeqNo, Multiple) -> #'basic.ack'{delivery_tag = MsgSeqNo, multiple = Multiple} - end, State). + end, State). -coalesce_and_send(MsgSeqNos, MkMsgFun, State = #ch{unconfirmed = UC}) -> +coalesce_and_send(MsgSeqNos, NegativeMsgSeqNos, MkMsgFun, State = #ch{unconfirmed = UC}) -> SMsgSeqNos = lists:usort(MsgSeqNos), - CutOff = case dtree:is_empty(UC) of + UnconfirmedCutoff = case dtree:is_empty(UC) of true -> lists:last(SMsgSeqNos) + 1; false -> {SeqNo, _XName} = dtree:smallest(UC), SeqNo end, - {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos), + Cutoff = lists:min([UnconfirmedCutoff | NegativeMsgSeqNos]), + {Ms, Ss} = lists:splitwith(fun(X) -> X < Cutoff end, SMsgSeqNos), case Ms of [] -> ok; _ -> ok = send(MkMsgFun(lists:last(Ms), true), State) |
