summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-11-15 13:54:28 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-11-15 13:54:28 +0000
commit5e1647232f7e6ee3b71fdfd47b083591773e34e8 (patch)
treea7cb374b63b3de43dc70557d222abf133dbd257f
parentcd2b0f85f5a1d351d7e287caad1e283d21d5d945 (diff)
downloadrabbitmq-server-git-5e1647232f7e6ee3b71fdfd47b083591773e34e8.tar.gz
wait for DLQ to confirm messages before deleting the queue
-rw-r--r--src/rabbit_amqqueue_process.erl39
1 files changed, 23 insertions, 16 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 059fa08075..e6fddd1482 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -723,15 +723,22 @@ mk_dead_letter_fun(Reason, _State) ->
BQS1
end.
-dead_letter_deleted_queue(_From, State = #q{dlx = undefined}) ->
- State;
+dead_letter_deleted_queue(undefined, State = #q{dlx = undefined}) ->
+ {stop, normal, State};
+dead_letter_deleted_queue(_From, State = #q{dlx = undefined,
+ backing_queue_state = BQS,
+ backing_queue = BQ}) ->
+ {stop, normal, {ok, BQ:len(BQS)}, State};
dead_letter_deleted_queue(From, State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
- BQS1 = BQ:dropwhile(fun (_) -> true end,
- mk_dead_letter_fun(queue_deleted, State),
- BQS),
- State#q{backing_queue_state = BQS1,
- blocked_op = {delete, {From, BQ:len(BQS)}}}.
+ case BQ:len(BQS) of
+ 0 -> dead_letter_deleted_queue(From, State#q{dlx = undefined});
+ _ -> BQS1 = BQ:dropwhile(fun (_) -> true end,
+ mk_dead_letter_fun(queue_deleted, State),
+ BQS),
+ noreply(State#q{blocked_op = {delete, {From, BQ:len(BQS)}},
+ backing_queue_state = BQS1})
+ end.
dead_letter_msg(Msg, Extra, Reason, State = #q{publish_seqno = MsgSeqNo,
unconfirmed = UC,
@@ -1077,7 +1084,7 @@ handle_call({delete, IfUnused, IfEmpty}, From,
IfUnused and not(IsUnused) ->
reply({error, in_use}, State);
true ->
- noreply(dead_letter_deleted_queue(From, State))
+ dead_letter_deleted_queue(From, State)
end;
handle_call(purge, _From, State = #q{backing_queue = BQ,
@@ -1133,8 +1140,7 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
end));
handle_cast(delete_immediately, State) ->
- State1 = dead_letter_deleted_queue(undefined, State),
- noreply(State1);
+ dead_letter_deleted_queue(undefined, State);
handle_cast({unblock, ChPid}, State) ->
noreply(
@@ -1195,10 +1201,9 @@ handle_cast({confirm, MsgSeqNos, _From},
{BQS3, UC3} =
lists:foldl(
fun (MsgSeqNo, {BQS1, UC1}) ->
- Reason = gb_trees:get(MsgSeqNo, UC1),
%% FIXME Forward confirms to channels
{_, BQS2} =
- case Reason of
+ case gb_trees:get(MsgSeqNo, UC1) of
{Reason, AckTag} when Reason =:= expired;
Reason =:= rejected;
Reason =:= queue_deleted;
@@ -1215,10 +1220,13 @@ handle_cast({confirm, MsgSeqNos, _From},
gen_server2:reply(From, {ok, Count}),
noreply(State1);
{true, {delete, {From, Count}}} ->
- gen_server2:reply(From, {ok, Count}),
+ case From of
+ undefined -> ok;
+ _ -> gen_server2:reply(From, {ok, Count})
+ end,
{stop, normal, State1};
_ ->
- noreply(State#q{blocked_op = Op})
+ noreply(State1#q{blocked_op = Op})
end;
handle_cast({dead_letter, {Msg, Extra}, Reason}, State) ->
@@ -1227,8 +1235,7 @@ handle_cast({dead_letter, {Msg, Extra}, Reason}, State) ->
handle_info(maybe_expire, State) ->
case is_unused(State) of
true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]),
- State1 = dead_letter_deleted_queue(undefined, State),
- {stop, normal, State1};
+ dead_letter_deleted_queue(undefined, State);
false -> noreply(ensure_expiry_timer(State))
end;