diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-11-14 17:03:26 +0000 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-11-14 17:03:26 +0000 |
| commit | cd2b0f85f5a1d351d7e287caad1e283d21d5d945 (patch) | |
| tree | f21d632b4b7d216ab0154fe264557d09de453529 /src | |
| parent | 38f64271fecd7fab463f0c27f7a4a1eda2307c07 (diff) | |
| download | rabbitmq-server-git-cd2b0f85f5a1d351d7e287caad1e283d21d5d945.tar.gz | |
partial wait-for-DLQ-before-deleting-queue
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 96 |
1 files changed, 46 insertions, 50 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c831a443f2..059fa08075 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -52,7 +52,7 @@ ttl_timer_ref, publish_seqno, unconfirmed, - blocked_purge, + blocked_op, dlx }). @@ -136,7 +136,7 @@ init(Q) -> dlx = undefined, publish_seqno = 1, unconfirmed = gb_trees:empty(), - blocked_purge = undefined, + blocked_op = undefined, msg_id_to_channel = gb_trees:empty()}, {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -161,7 +161,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, ttl = undefined, publish_seqno = 1, unconfirmed = gb_trees:empty(), - blocked_purge = undefined, + blocked_op = undefined, msg_id_to_channel = MTC}, State1 = requeue_and_run(AckTags, process_args( rabbit_event:init_stats_timer( @@ -723,19 +723,15 @@ mk_dead_letter_fun(Reason, _State) -> BQS1 end. -maybe_dead_letter_queue(_Reason, State = #q{dlx = undefined}) -> +dead_letter_deleted_queue(_From, State = #q{dlx = undefined}) -> State; -maybe_dead_letter_queue(Reason, State = #q{ - backing_queue_state = BQS, - backing_queue = BQ}) -> - case BQ:fetch(false, BQS) of - {empty, BQS1} -> - State#q{backing_queue_state = BQS1}; - {{Msg, _IsDelivered, _AckTag, _Remaining}, BQS1} -> - State1 = dead_letter_msg(Msg, undefined, Reason, - State#q{backing_queue_state = BQS1}), - maybe_dead_letter_queue(Reason, State1) - end. +dead_letter_deleted_queue(From, State = #q{backing_queue_state = BQS, + backing_queue = BQ}) -> + BQS1 = BQ:dropwhile(fun (_) -> true end, + mk_dead_letter_fun(queue_deleted, State), + BQS), + State#q{backing_queue_state = BQS1, + blocked_op = {delete, {From, BQ:len(BQS)}}}. dead_letter_msg(Msg, Extra, Reason, State = #q{publish_seqno = MsgSeqNo, unconfirmed = UC, @@ -1071,7 +1067,7 @@ handle_call(stat, _From, State) -> drop_expired_messages(ensure_expiry_timer(State)), reply({ok, BQ:len(BQS), active_consumer_count()}, State1); -handle_call({delete, IfUnused, IfEmpty}, _From, +handle_call({delete, IfUnused, IfEmpty}, From, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> IsEmpty = BQ:is_empty(BQS), IsUnused = is_unused(State), @@ -1081,8 +1077,7 @@ handle_call({delete, IfUnused, IfEmpty}, _From, IfUnused and not(IsUnused) -> reply({error, in_use}, State); true -> - State1 = maybe_dead_letter_queue(queue_deleted, State), - {stop, normal, {ok, BQ:len(BQS)}, State1} + noreply(dead_letter_deleted_queue(From, State)) end; handle_call(purge, _From, State = #q{backing_queue = BQ, @@ -1098,7 +1093,7 @@ handle_call(purge, From, State = #q{backing_queue = BQ, mk_dead_letter_fun(queue_purged, State), BQS), noreply(State#q{backing_queue_state = BQS1, - blocked_purge = {From, BQ:len(BQS)}}); + blocked_op = {purge, {From, BQ:len(BQS)}}}); handle_call({requeue, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), @@ -1138,8 +1133,8 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) -> end)); handle_cast(delete_immediately, State) -> - State1 = maybe_dead_letter_queue(queue_deleted, State), - {stop, normal, State1}; + State1 = dead_letter_deleted_queue(undefined, State), + noreply(State1); handle_cast({unblock, ChPid}, State) -> noreply( @@ -1196,34 +1191,35 @@ handle_cast({confirm, MsgSeqNos, _From}, State = #q{unconfirmed = UC, backing_queue = BQ, backing_queue_state = BQS, - blocked_purge = Purge}) -> - {BQS3, UC3} = lists:foldl( - fun (MsgSeqNo, {BQS1, UC1}) -> - Reason = gb_trees:get(MsgSeqNo, UC1), - %% FIXME Forward confirms to channels - {_, BQS2} = - case Reason of - {expired, AckTag} -> - BQ:ack([AckTag], undefined, BQS1); - {rejected, AckTag} -> - BQ:ack([AckTag], undefined, BQS1); - {queue_deleted, _} -> - BQS; - {queue_purged, AckTag} -> - BQ:ack([AckTag], undefined, BQS1) - end, - {BQS2, gb_trees:delete(MsgSeqNo, UC1)} - end, {BQS, UC}, MsgSeqNos), - Purge1 = case {gb_trees:is_empty(UC3), Purge} of - {true, {From, Count}} -> - gen_server2:reply(From, {ok, Count}), - undefined; - _ -> - Purge - end, - noreply(State#q{unconfirmed = UC3, - blocked_purge = Purge1, - backing_queue_state = BQS3}); + blocked_op = Op}) -> + {BQS3, UC3} = + lists:foldl( + fun (MsgSeqNo, {BQS1, UC1}) -> + Reason = gb_trees:get(MsgSeqNo, UC1), + %% FIXME Forward confirms to channels + {_, BQS2} = + case Reason of + {Reason, AckTag} when Reason =:= expired; + Reason =:= rejected; + Reason =:= queue_deleted; + Reason =:= queue_purged -> + BQ:ack([AckTag], undefined, BQS1) + end, + {BQS2, gb_trees:delete(MsgSeqNo, UC1)} + end, {BQS, UC}, MsgSeqNos), + State1 = State#q{unconfirmed = UC3, + backing_queue_state = BQS3, + blocked_op = undefined}, + case {gb_trees:is_empty(UC3), Op} of + {true, {purge, {From, Count}}} -> + gen_server2:reply(From, {ok, Count}), + noreply(State1); + {true, {delete, {From, Count}}} -> + gen_server2:reply(From, {ok, Count}), + {stop, normal, State1}; + _ -> + noreply(State#q{blocked_op = Op}) + end; handle_cast({dead_letter, {Msg, Extra}, Reason}, State) -> noreply(dead_letter_msg(Msg, Extra, Reason, State)). @@ -1231,7 +1227,7 @@ handle_cast({dead_letter, {Msg, Extra}, Reason}, State) -> handle_info(maybe_expire, State) -> case is_unused(State) of true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]), - State1 = maybe_dead_letter_queue(queue_deleted, State), + State1 = dead_letter_deleted_queue(undefined, State), {stop, normal, State1}; false -> noreply(ensure_expiry_timer(State)) end; |
