diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 37 |
1 files changed, 24 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 981501f1a1..3d1464bc2d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -743,9 +743,12 @@ dead_letter_deleted_queue(From, State = #q{backing_queue_state = BQS, backing_queue_state = BQS1}) end. -dead_letter_msg(Msg, AckTag, Reason, State = #q{publish_seqno = MsgSeqNo, - unconfirmed = UC, - dlx = DLX}) -> +dead_letter_msg(Msg, AckTag, Reason, + State = #q{publish_seqno = MsgSeqNo, + unconfirmed = UC, + dlx = DLX, + backing_queue = BQ, + backing_queue_state = BQS}) -> rabbit_exchange:lookup_or_die(DLX), {ok, _, QPids} = @@ -754,9 +757,15 @@ dead_letter_msg(Msg, AckTag, Reason, State = #q{publish_seqno = MsgSeqNo, false, false, make_dead_letter_msg(DLX, Reason, Msg, State), MsgSeqNo)), State1 = lists:foldl(fun monitor_queue/2, State, QPids), - UC1 = gb_trees:insert(MsgSeqNo, {gb_sets:from_list(QPids), AckTag}, UC), - State1#q{publish_seqno = MsgSeqNo + 1, - unconfirmed = UC1}. + State2 = State1#q{publish_seqno = MsgSeqNo + 1}, + case QPids of + [] -> {_, BQS1} = BQ:ack([AckTag], undefined, BQS), + cleanup_after_confirm(State2#q{backing_queue_state = BQS1}); + _ -> noreply(State2#q{ + unconfirmed = gb_trees:insert( + MsgSeqNo, {gb_sets:from_list(QPids), + AckTag}, UC)}) + end. monitor_queue(QPid, State = #q{queue_monitors = QMons}) -> case dict:is_key(QPid, QMons) of @@ -783,8 +792,7 @@ handle_queue_down(QPid, State = #q{queue_monitors = QMons, handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed = UC, backing_queue = BQ, - backing_queue_state = BQS, - blocked_op = Op}) -> + backing_queue_state = BQS}) -> {BQS3, UC3} = lists:foldl( fun (MsgSeqNo, {BQS1, UC1}) -> @@ -798,10 +806,13 @@ handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed = UC, {QPids1, AckTag}, UC1)} end end, {BQS, UC}, MsgSeqNos), - State1 = State#q{unconfirmed = UC3, - backing_queue_state = BQS3, - blocked_op = undefined}, - case {gb_trees:is_empty(UC3), Op} of + cleanup_after_confirm(State#q{unconfirmed = UC3, + backing_queue_state = BQS3}). + +cleanup_after_confirm(State = #q{blocked_op = Op, + unconfirmed = UC}) -> + State1 = State#q{blocked_op = undefined}, + case {gb_trees:is_empty(UC), Op} of {true, {purge, {From, Count}}} -> gen_server2:reply(From, {ok, Count}), noreply(State1); @@ -1269,7 +1280,7 @@ handle_cast({confirm, MsgSeqNos, QPid}, State) -> handle_confirm(MsgSeqNos, QPid, State); handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) -> - noreply(dead_letter_msg(Msg, AckTag, Reason, State)). + dead_letter_msg(Msg, AckTag, Reason, State). handle_info(maybe_expire, State) -> case is_unused(State) of |
