summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl31
1 files changed, 18 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 21b6089bc6..280d83f569 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -328,10 +328,13 @@ noreply(NewState) ->
{NewState1, Timeout} = next_state(NewState),
{noreply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}.
-next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+next_state(State = #q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ msg_id_to_channel = MTC}) ->
assert_invariant(State),
{MsgIds, BQS1} = BQ:drain_confirmed(BQS),
- State1 = confirm_messages(MsgIds, State#q{backing_queue_state = BQS1}),
+ MTC1 = confirm_messages(MsgIds, MTC),
+ State1 = State#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1},
case BQ:needs_timeout(BQS1) of
false -> {stop_sync_timer(State1), hibernate };
idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL};
@@ -412,9 +415,9 @@ maybe_send_drained(WasEmpty, State) ->
end,
State.
-confirm_messages([], State) ->
- State;
-confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
+confirm_messages([], MTC) ->
+ MTC;
+confirm_messages(MsgIds, MTC) ->
{CMs, MTC1} =
lists:foldl(
fun(MsgId, {CMs, MTC0}) ->
@@ -428,7 +431,7 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
end
end, {gb_trees:empty(), MTC}, MsgIds),
rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs),
- State#q{msg_id_to_channel = MTC1}.
+ MTC1.
send_or_record_confirm(#delivery{confirm = false}, State) ->
{never, State};
@@ -457,14 +460,16 @@ send_mandatory(#delivery{mandatory = true,
discard(#delivery{confirm = Confirm,
sender = SenderPid,
- message = #basic_message{id = MsgId}}, State) ->
- State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- case Confirm of
- true -> confirm_messages([MsgId], State);
- false -> State
- end,
+ message = #basic_message{id = MsgId}},
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ msg_id_to_channel = MTC}) ->
+ MTC1 = case Confirm of
+ true -> confirm_messages([MsgId], MTC);
+ false -> MTC
+ end,
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
- State1#q{backing_queue_state = BQS1}.
+ State#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1}.
run_message_queue(State) -> run_message_queue(false, State).