diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 44 |
1 files changed, 41 insertions, 3 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5d3d939a73..3c1aa0e314 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -52,6 +52,7 @@ ttl_timer_ref, publish_seqno, unconfirmed, + unconfirmed_qm, blocked_op, queue_monitors, dlx, @@ -139,6 +140,7 @@ init(Q) -> dlx_routing_key = undefined, publish_seqno = 1, unconfirmed = gb_trees:empty(), + unconfirmed_qm = gb_trees:empty(), blocked_op = undefined, queue_monitors = dict:new(), msg_id_to_channel = gb_trees:empty()}, @@ -165,6 +167,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, ttl = undefined, publish_seqno = 1, unconfirmed = gb_trees:empty(), + unconfirmed_qm = gb_trees:empty(), blocked_op = undefined, queue_monitors = dict:new(), msg_id_to_channel = MTC}, @@ -768,7 +771,23 @@ dead_letter_msg(Msg, AckTag, Reason, case QPids of [] -> {_, BQS1} = BQ:ack([AckTag], undefined, BQS), cleanup_after_confirm(State2#q{backing_queue_state = BQS1}); - _ -> noreply(State2#q{ + _ -> State3 = + lists:foldl( + fun(QPid, State0 = #q{unconfirmed_qm = UQM}) -> + case gb_trees:lookup(QPid, UQM) of + {value, MsgSeqNos} -> + MsgSeqNos1 = gb_sets:insert(MsgSeqNo, + MsgSeqNos), + UQM1 = gb_trees:update(QPid, MsgSeqNos1, + UQM), + State0#q{unconfirmed_qm = UQM1}; + none -> + S = gb_sets:singleton(MsgSeqNo), + UQM1 = gb_trees:insert(QPid, S, UQM), + State0#q{unconfirmed_qm = UQM1} + end + end, State2, QPids), + noreply(State3#q{ unconfirmed = gb_trees:insert( MsgSeqNo, {gb_sets:from_list(QPids), AckTag}, UC)}) @@ -782,6 +801,13 @@ monitor_queue(QPid, State = #q{queue_monitors = QMons}) -> QMons)} end. +demonitor_queue(QPid, State = #q{queue_monitors = QMons}) -> + case dict:find(QPid, QMons) of + {ok, MRef} -> erlang:demonitor(MRef), + State#q{queue_monitors = dict:erase(QPid, QMons)}; + error -> State + end. + handle_queue_down(QPid, State = #q{queue_monitors = QMons, unconfirmed = UC}) -> case dict:find(QPid, QMons) of @@ -798,6 +824,7 @@ handle_queue_down(QPid, State = #q{queue_monitors = QMons, end. handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed = UC, + unconfirmed_qm = UQM, backing_queue = BQ, backing_queue_state = BQS}) -> {BQS3, UC3} = @@ -813,8 +840,19 @@ handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed = UC, {QPids1, AckTag}, UC1)} end end, {BQS, UC}, MsgSeqNos), - cleanup_after_confirm(State#q{unconfirmed = UC3, - backing_queue_state = BQS3}). + MsgSeqNos1 = gb_sets:difference(gb_trees:get(QPid, UQM), + gb_sets:from_list(MsgSeqNos)), + State1 = case gb_sets:is_empty(MsgSeqNos1) of + false -> State#q{ + unconfirmed_qm = + gb_trees:update(QPid, MsgSeqNos1, UQM)}; + true -> demonitor_queue( + QPid, State#q{ + unconfirmed_qm = + gb_trees:delete(QPid, UQM)}) + end, + cleanup_after_confirm(State1#q{unconfirmed = UC3, + backing_queue_state = BQS3}). cleanup_after_confirm(State = #q{blocked_op = Op, unconfirmed = UC}) -> |
