summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl36
1 files changed, 24 insertions, 12 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a631990bff..30822aa6ae 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -733,12 +733,19 @@ mk_dead_letter_fun(Reason, _State) ->
BQS1
end.
+dead_letter_deleted_queue_reply(From, State) ->
+ case dead_letter_deleted_queue(From, State) of
+ {stop, State1} -> {stop, normal, State1};
+ {stop, Count, State1} -> {stop, normal, {ok, Count}, State1};
+ {ok, State1} -> noreply(State1)
+ end.
+
dead_letter_deleted_queue(undefined, State = #q{dlx = undefined}) ->
- {stop, normal, State};
+ {stop, State};
dead_letter_deleted_queue(_From, State = #q{dlx = undefined,
backing_queue_state = BQS,
backing_queue = BQ}) ->
- {stop, normal, {ok, BQ:len(BQS)}, State};
+ {stop, BQ:len(BQS), State};
dead_letter_deleted_queue(From, State = #q{backing_queue_state = BQS,
backing_queue = BQ,
blocked_ops = Ops}) ->
@@ -752,8 +759,8 @@ dead_letter_deleted_queue(From, State = #q{backing_queue_state = BQS,
true -> Ops; %% don't queue more than one delete
false -> [{delete, {From, BQ:len(BQS)}} | Ops]
end,
- noreply(State#q{blocked_ops = Ops1,
- backing_queue_state = BQS1})
+ {ok, State#q{blocked_ops = Ops1,
+ backing_queue_state = BQS1}}
end.
dead_letter_msg(Msg, AckTag, Reason,
@@ -1143,8 +1150,8 @@ handle_call({notify_down, ChPid}, _From, State) ->
%% return stop with a reply, terminate/2 will be called by
%% gen_server2 *before* the reply is sent.
case handle_ch_down(ChPid, State) of
- {ok, NewState} -> reply(ok, NewState);
- {stop, NewState} -> {stop, normal, ok, NewState}
+ {ok, State1} -> reply(ok, State1);
+ {stop, State1} -> dead_letter_deleted_queue_reply(undefined, State1)
end;
handle_call({basic_get, ChPid, NoAck}, _From,
@@ -1219,7 +1226,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
State#q.active_consumers)},
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
- true -> {stop, normal, ok, State1}
+ true -> dead_letter_deleted_queue_reply(undefined, State1)
end
end;
@@ -1235,7 +1242,8 @@ handle_call({delete, IfUnused, IfEmpty}, From,
if
IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State);
IfUnused and not(IsUnused) -> reply({error, in_use}, State);
- true -> dead_letter_deleted_queue(From, State)
+ true -> dead_letter_deleted_queue_reply(From,
+ State)
end;
handle_call(purge, _From, State = #q{backing_queue = BQ,
@@ -1311,7 +1319,7 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
end));
handle_cast(delete_immediately, State) ->
- dead_letter_deleted_queue(undefined, State);
+ dead_letter_deleted_queue_reply(undefined, State);
handle_cast({unblock, ChPid}, State) ->
noreply(
@@ -1372,7 +1380,7 @@ handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) ->
handle_info(maybe_expire, State) ->
case is_unused(State) of
- true -> dead_letter_deleted_queue(undefined, State);
+ true -> dead_letter_deleted_queue_reply(undefined, State);
false -> noreply(ensure_expiry_timer(State))
end;
@@ -1394,11 +1402,15 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
%% match what people expect (see bug 21824). However we need this
%% monitor-and-async- delete in case the connection goes away
%% unexpectedly.
- {stop, normal, State};
+ case dead_letter_deleted_queue(undefined, State) of
+ {stop, State1} -> {stop, normal, State1};
+ {stop, _, State1} -> {stop, normal, State1};
+ {ok, State1} -> noreply(State1)
+ end;
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
case handle_ch_down(DownPid, State) of
{ok, State1} -> handle_queue_down(DownPid, State1);
- {stop, State1} -> {stop, normal, State1}
+ {stop, State1} -> dead_letter_deleted_queue(undefined, State1)
end;
handle_info(update_ram_duration, State = #q{backing_queue = BQ,