diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 31 |
1 files changed, 18 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 21b6089bc6..280d83f569 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -328,10 +328,13 @@ noreply(NewState) -> {NewState1, Timeout} = next_state(NewState), {noreply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}. -next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> +next_state(State = #q{backing_queue = BQ, + backing_queue_state = BQS, + msg_id_to_channel = MTC}) -> assert_invariant(State), {MsgIds, BQS1} = BQ:drain_confirmed(BQS), - State1 = confirm_messages(MsgIds, State#q{backing_queue_state = BQS1}), + MTC1 = confirm_messages(MsgIds, MTC), + State1 = State#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1}, case BQ:needs_timeout(BQS1) of false -> {stop_sync_timer(State1), hibernate }; idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL}; @@ -412,9 +415,9 @@ maybe_send_drained(WasEmpty, State) -> end, State. -confirm_messages([], State) -> - State; -confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> +confirm_messages([], MTC) -> + MTC; +confirm_messages(MsgIds, MTC) -> {CMs, MTC1} = lists:foldl( fun(MsgId, {CMs, MTC0}) -> @@ -428,7 +431,7 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> end end, {gb_trees:empty(), MTC}, MsgIds), rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs), - State#q{msg_id_to_channel = MTC1}. + MTC1. send_or_record_confirm(#delivery{confirm = false}, State) -> {never, State}; @@ -457,14 +460,16 @@ send_mandatory(#delivery{mandatory = true, discard(#delivery{confirm = Confirm, sender = SenderPid, - message = #basic_message{id = MsgId}}, State) -> - State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = - case Confirm of - true -> confirm_messages([MsgId], State); - false -> State - end, + message = #basic_message{id = MsgId}}, + State = #q{backing_queue = BQ, + backing_queue_state = BQS, + msg_id_to_channel = MTC}) -> + MTC1 = case Confirm of + true -> confirm_messages([MsgId], MTC); + false -> MTC + end, BQS1 = BQ:discard(MsgId, SenderPid, BQS), - State1#q{backing_queue_state = BQS1}. + State#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1}. run_message_queue(State) -> run_message_queue(false, State). |
