diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-11-15 13:54:28 +0000 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-11-15 13:54:28 +0000 |
| commit | 5e1647232f7e6ee3b71fdfd47b083591773e34e8 (patch) | |
| tree | a7cb374b63b3de43dc70557d222abf133dbd257f | |
| parent | cd2b0f85f5a1d351d7e287caad1e283d21d5d945 (diff) | |
| download | rabbitmq-server-git-5e1647232f7e6ee3b71fdfd47b083591773e34e8.tar.gz | |
wait for DLQ to confirm messages before deleting the queue
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 39 |
1 files changed, 23 insertions, 16 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 059fa08075..e6fddd1482 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -723,15 +723,22 @@ mk_dead_letter_fun(Reason, _State) -> BQS1 end. -dead_letter_deleted_queue(_From, State = #q{dlx = undefined}) -> - State; +dead_letter_deleted_queue(undefined, State = #q{dlx = undefined}) -> + {stop, normal, State}; +dead_letter_deleted_queue(_From, State = #q{dlx = undefined, + backing_queue_state = BQS, + backing_queue = BQ}) -> + {stop, normal, {ok, BQ:len(BQS)}, State}; dead_letter_deleted_queue(From, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> - BQS1 = BQ:dropwhile(fun (_) -> true end, - mk_dead_letter_fun(queue_deleted, State), - BQS), - State#q{backing_queue_state = BQS1, - blocked_op = {delete, {From, BQ:len(BQS)}}}. + case BQ:len(BQS) of + 0 -> dead_letter_deleted_queue(From, State#q{dlx = undefined}); + _ -> BQS1 = BQ:dropwhile(fun (_) -> true end, + mk_dead_letter_fun(queue_deleted, State), + BQS), + noreply(State#q{blocked_op = {delete, {From, BQ:len(BQS)}}, + backing_queue_state = BQS1}) + end. dead_letter_msg(Msg, Extra, Reason, State = #q{publish_seqno = MsgSeqNo, unconfirmed = UC, @@ -1077,7 +1084,7 @@ handle_call({delete, IfUnused, IfEmpty}, From, IfUnused and not(IsUnused) -> reply({error, in_use}, State); true -> - noreply(dead_letter_deleted_queue(From, State)) + dead_letter_deleted_queue(From, State) end; handle_call(purge, _From, State = #q{backing_queue = BQ, @@ -1133,8 +1140,7 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) -> end)); handle_cast(delete_immediately, State) -> - State1 = dead_letter_deleted_queue(undefined, State), - noreply(State1); + dead_letter_deleted_queue(undefined, State); handle_cast({unblock, ChPid}, State) -> noreply( @@ -1195,10 +1201,9 @@ handle_cast({confirm, MsgSeqNos, _From}, {BQS3, UC3} = lists:foldl( fun (MsgSeqNo, {BQS1, UC1}) -> - Reason = gb_trees:get(MsgSeqNo, UC1), %% FIXME Forward confirms to channels {_, BQS2} = - case Reason of + case gb_trees:get(MsgSeqNo, UC1) of {Reason, AckTag} when Reason =:= expired; Reason =:= rejected; Reason =:= queue_deleted; @@ -1215,10 +1220,13 @@ handle_cast({confirm, MsgSeqNos, _From}, gen_server2:reply(From, {ok, Count}), noreply(State1); {true, {delete, {From, Count}}} -> - gen_server2:reply(From, {ok, Count}), + case From of + undefined -> ok; + _ -> gen_server2:reply(From, {ok, Count}) + end, {stop, normal, State1}; _ -> - noreply(State#q{blocked_op = Op}) + noreply(State1#q{blocked_op = Op}) end; handle_cast({dead_letter, {Msg, Extra}, Reason}, State) -> @@ -1227,8 +1235,7 @@ handle_cast({dead_letter, {Msg, Extra}, Reason}, State) -> handle_info(maybe_expire, State) -> case is_unused(State) of true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]), - State1 = dead_letter_deleted_queue(undefined, State), - {stop, normal, State1}; + dead_letter_deleted_queue(undefined, State); false -> noreply(ensure_expiry_timer(State)) end; |
