summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-11-14 17:03:26 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-11-14 17:03:26 +0000
commitcd2b0f85f5a1d351d7e287caad1e283d21d5d945 (patch)
treef21d632b4b7d216ab0154fe264557d09de453529 /src
parent38f64271fecd7fab463f0c27f7a4a1eda2307c07 (diff)
downloadrabbitmq-server-git-cd2b0f85f5a1d351d7e287caad1e283d21d5d945.tar.gz
partial wait-for-DLQ-before-deleting-queue
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl96
1 files changed, 46 insertions, 50 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c831a443f2..059fa08075 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -52,7 +52,7 @@
ttl_timer_ref,
publish_seqno,
unconfirmed,
- blocked_purge,
+ blocked_op,
dlx
}).
@@ -136,7 +136,7 @@ init(Q) ->
dlx = undefined,
publish_seqno = 1,
unconfirmed = gb_trees:empty(),
- blocked_purge = undefined,
+ blocked_op = undefined,
msg_id_to_channel = gb_trees:empty()},
{ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -161,7 +161,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
ttl = undefined,
publish_seqno = 1,
unconfirmed = gb_trees:empty(),
- blocked_purge = undefined,
+ blocked_op = undefined,
msg_id_to_channel = MTC},
State1 = requeue_and_run(AckTags, process_args(
rabbit_event:init_stats_timer(
@@ -723,19 +723,15 @@ mk_dead_letter_fun(Reason, _State) ->
BQS1
end.
-maybe_dead_letter_queue(_Reason, State = #q{dlx = undefined}) ->
+dead_letter_deleted_queue(_From, State = #q{dlx = undefined}) ->
State;
-maybe_dead_letter_queue(Reason, State = #q{
- backing_queue_state = BQS,
- backing_queue = BQ}) ->
- case BQ:fetch(false, BQS) of
- {empty, BQS1} ->
- State#q{backing_queue_state = BQS1};
- {{Msg, _IsDelivered, _AckTag, _Remaining}, BQS1} ->
- State1 = dead_letter_msg(Msg, undefined, Reason,
- State#q{backing_queue_state = BQS1}),
- maybe_dead_letter_queue(Reason, State1)
- end.
+dead_letter_deleted_queue(From, State = #q{backing_queue_state = BQS,
+ backing_queue = BQ}) ->
+ BQS1 = BQ:dropwhile(fun (_) -> true end,
+ mk_dead_letter_fun(queue_deleted, State),
+ BQS),
+ State#q{backing_queue_state = BQS1,
+ blocked_op = {delete, {From, BQ:len(BQS)}}}.
dead_letter_msg(Msg, Extra, Reason, State = #q{publish_seqno = MsgSeqNo,
unconfirmed = UC,
@@ -1071,7 +1067,7 @@ handle_call(stat, _From, State) ->
drop_expired_messages(ensure_expiry_timer(State)),
reply({ok, BQ:len(BQS), active_consumer_count()}, State1);
-handle_call({delete, IfUnused, IfEmpty}, _From,
+handle_call({delete, IfUnused, IfEmpty}, From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
IsEmpty = BQ:is_empty(BQS),
IsUnused = is_unused(State),
@@ -1081,8 +1077,7 @@ handle_call({delete, IfUnused, IfEmpty}, _From,
IfUnused and not(IsUnused) ->
reply({error, in_use}, State);
true ->
- State1 = maybe_dead_letter_queue(queue_deleted, State),
- {stop, normal, {ok, BQ:len(BQS)}, State1}
+ noreply(dead_letter_deleted_queue(From, State))
end;
handle_call(purge, _From, State = #q{backing_queue = BQ,
@@ -1098,7 +1093,7 @@ handle_call(purge, From, State = #q{backing_queue = BQ,
mk_dead_letter_fun(queue_purged, State),
BQS),
noreply(State#q{backing_queue_state = BQS1,
- blocked_purge = {From, BQ:len(BQS)}});
+ blocked_op = {purge, {From, BQ:len(BQS)}}});
handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
@@ -1138,8 +1133,8 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
end));
handle_cast(delete_immediately, State) ->
- State1 = maybe_dead_letter_queue(queue_deleted, State),
- {stop, normal, State1};
+ State1 = dead_letter_deleted_queue(undefined, State),
+ noreply(State1);
handle_cast({unblock, ChPid}, State) ->
noreply(
@@ -1196,34 +1191,35 @@ handle_cast({confirm, MsgSeqNos, _From},
State = #q{unconfirmed = UC,
backing_queue = BQ,
backing_queue_state = BQS,
- blocked_purge = Purge}) ->
- {BQS3, UC3} = lists:foldl(
- fun (MsgSeqNo, {BQS1, UC1}) ->
- Reason = gb_trees:get(MsgSeqNo, UC1),
- %% FIXME Forward confirms to channels
- {_, BQS2} =
- case Reason of
- {expired, AckTag} ->
- BQ:ack([AckTag], undefined, BQS1);
- {rejected, AckTag} ->
- BQ:ack([AckTag], undefined, BQS1);
- {queue_deleted, _} ->
- BQS;
- {queue_purged, AckTag} ->
- BQ:ack([AckTag], undefined, BQS1)
- end,
- {BQS2, gb_trees:delete(MsgSeqNo, UC1)}
- end, {BQS, UC}, MsgSeqNos),
- Purge1 = case {gb_trees:is_empty(UC3), Purge} of
- {true, {From, Count}} ->
- gen_server2:reply(From, {ok, Count}),
- undefined;
- _ ->
- Purge
- end,
- noreply(State#q{unconfirmed = UC3,
- blocked_purge = Purge1,
- backing_queue_state = BQS3});
+ blocked_op = Op}) ->
+ {BQS3, UC3} =
+ lists:foldl(
+ fun (MsgSeqNo, {BQS1, UC1}) ->
+ Reason = gb_trees:get(MsgSeqNo, UC1),
+ %% FIXME Forward confirms to channels
+ {_, BQS2} =
+ case Reason of
+ {Reason, AckTag} when Reason =:= expired;
+ Reason =:= rejected;
+ Reason =:= queue_deleted;
+ Reason =:= queue_purged ->
+ BQ:ack([AckTag], undefined, BQS1)
+ end,
+ {BQS2, gb_trees:delete(MsgSeqNo, UC1)}
+ end, {BQS, UC}, MsgSeqNos),
+ State1 = State#q{unconfirmed = UC3,
+ backing_queue_state = BQS3,
+ blocked_op = undefined},
+ case {gb_trees:is_empty(UC3), Op} of
+ {true, {purge, {From, Count}}} ->
+ gen_server2:reply(From, {ok, Count}),
+ noreply(State1);
+ {true, {delete, {From, Count}}} ->
+ gen_server2:reply(From, {ok, Count}),
+ {stop, normal, State1};
+ _ ->
+ noreply(State#q{blocked_op = Op})
+ end;
handle_cast({dead_letter, {Msg, Extra}, Reason}, State) ->
noreply(dead_letter_msg(Msg, Extra, Reason, State)).
@@ -1231,7 +1227,7 @@ handle_cast({dead_letter, {Msg, Extra}, Reason}, State) ->
handle_info(maybe_expire, State) ->
case is_unused(State) of
true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]),
- State1 = maybe_dead_letter_queue(queue_deleted, State),
+ State1 = dead_letter_deleted_queue(undefined, State),
{stop, normal, State1};
false -> noreply(ensure_expiry_timer(State))
end;