diff options
| -rw-r--r-- | src/rabbit_channel.erl | 47 |
1 files changed, 18 insertions, 29 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 4b166e28ff..5702625915 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -468,37 +468,29 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> do_if_unconfirmed([MSN || MSN <- MsgSeqNos, gb_sets:is_element(MSN, UC)], QPid, State). -%% clears references to MsgSeqNo and does internal_flush_confirms -do_if_unconfirmed(MsgSeqNos, undefined, - State = #ch{unconfirmed = UC, - queues_for_msg = QFM}) -> +do_if_unconfirmed(MsgSeqNos, undefined, State = #ch{unconfirmed = UC, + queues_for_msg = QFM}) -> MS = gb_sets:from_list(MsgSeqNos), - Unconfirmed1 = gb_sets:difference(UC, MS), - QFM1 = dict:filter(fun(M, _Q) -> - not(gb_sets:is_element(M, MS)) - end, QFM), - internal_flush_confirms(State#ch{unconfirmed = Unconfirmed1, - queues_for_msg = QFM1}, MsgSeqNos); + QFM1 = dict:filter(fun(M, _Q) -> not(gb_sets:is_element(M, MS)) end, QFM), + flush_confirms(State#ch{unconfirmed = gb_sets:difference(UC, MS), + queues_for_msg = QFM1}, MsgSeqNos); do_if_unconfirmed(MsgSeqNos, QPid, State) -> {DoneMessages, State1} = lists:foldl( - fun(MsgSeqNo, - {DMs, State0 = #ch{unconfirmed = UC0, - queues_for_msg = QFM0}}) -> + fun(MsgSeqNo, {DMs, State0 = #ch{unconfirmed = UC0, + queues_for_msg = QFM0}}) -> {ok, Qs} = dict:find(MsgSeqNo, QFM0), Qs1 = sets:del_element(QPid, Qs), case sets:size(Qs1) of 0 -> {[MsgSeqNo | DMs], State0#ch{ - queues_for_msg = - dict:erase(MsgSeqNo, QFM0), - unconfirmed = - gb_sets:delete(MsgSeqNo, UC0)}}; + queues_for_msg = dict:erase(MsgSeqNo, QFM0), + unconfirmed = gb_sets:delete(MsgSeqNo, UC0)}}; _ -> QFM1 = dict:store(MsgSeqNo, Qs1, QFM0), {DMs, State0#ch{queues_for_msg = QFM1}} end end, {[], State}, MsgSeqNos), - internal_flush_confirms(State1, DoneMessages). + flush_confirms(State1, DoneMessages). handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -1231,10 +1223,9 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> lock_message(false, _MsgStruct, State) -> State. -internal_flush_confirms(State, []) -> +flush_confirms(State, []) -> State; -internal_flush_confirms(State = #ch{writer_pid = WriterPid, - unconfirmed = UC}, Cs) -> +flush_confirms(State = #ch{writer_pid = WriterPid, unconfirmed = UC}, Cs) -> SCs = lists:usort(Cs), CutOff = case gb_sets:is_empty(UC) of true -> lists:last(SCs) + 1; @@ -1244,15 +1235,13 @@ internal_flush_confirms(State = #ch{writer_pid = WriterPid, case Ms of [] -> ok; _ -> ok = rabbit_writer:send_command( - WriterPid, - #'basic.ack'{delivery_tag = lists:last(Ms), - multiple = true}) + WriterPid, #'basic.ack'{delivery_tag = lists:last(Ms), + multiple = true}) end, - ok = lists:foldl( - fun(T, ok) -> rabbit_writer:send_command( - WriterPid, - #'basic.ack'{delivery_tag = T}) - end, ok, Ss), + ok = lists:foldl(fun(T, ok) -> + rabbit_writer:send_command( + WriterPid, #'basic.ack'{delivery_tag = T}) + end, ok, Ss), State. terminate(_State) -> |
