summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl57
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_misc.erl15
3 files changed, 41 insertions, 33 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 166a9576aa..5305935b49 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -53,7 +53,7 @@
publish_seqno,
unconfirmed_mq,
unconfirmed_qm,
- blocked_ops,
+ delayed_delete,
queue_monitors,
dlx,
dlx_routing_key
@@ -140,7 +140,7 @@ init(Q) ->
publish_seqno = 1,
unconfirmed_mq = gb_trees:empty(),
unconfirmed_qm = gb_trees:empty(),
- blocked_ops = [],
+ delayed_delete = undefined,
queue_monitors = dict:new(),
msg_id_to_channel = gb_trees:empty()},
{ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate,
@@ -166,7 +166,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
publish_seqno = 1,
unconfirmed_mq = gb_trees:empty(),
unconfirmed_qm = gb_trees:empty(),
- blocked_ops = [],
+ delayed_delete = undefined,
queue_monitors = dict:new(),
msg_id_to_channel = MTC},
State1 = requeue_and_run(AckTags, process_args(
@@ -835,16 +835,28 @@ handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ,
cleanup_after_confirm(State1#q{unconfirmed_mq = UMQ3,
backing_queue_state = BQS1}).
-cleanup_after_confirm(State = #q{blocked_ops = Ops,
+stop_later(Reason, State) ->
+ stop_later(Reason, undefined, noreply, State).
+
+stop_later(Reason, From, Reply, State = #q{unconfirmed_mq = UMQ}) ->
+ case {gb_trees:is_empty(UMQ), Reply} of
+ {true, noreply} ->
+ {stop, Reason, State};
+ {true, _} ->
+ {stop, Reason, Reply, State};
+ {false, _} ->
+ noreply(State#q{delayed_delete = {Reason, {From, Reply}}})
+ end.
+
+cleanup_after_confirm(State = #q{delayed_delete = DD,
unconfirmed_mq = UMQ}) ->
- case gb_trees:is_empty(UMQ) andalso Ops =/= [] of
- true -> [gen_server2:reply(From, {ok, Count}) ||
- {_, {From, Count}} <- Ops, From =/= undefined],
- State1 = State#q{blocked_ops = []},
- case lists:any(fun({Rsn, _}) -> Rsn =:= delete end, Ops) of
- true -> {stop, normal, State1};
- false -> noreply(State1)
- end;
+ case gb_trees:is_empty(UMQ) andalso DD =/= undefined of
+ true -> case DD of
+ {_, noreply} -> ok;
+ {_, {From, Reply}} -> gen_server2:reply(From, Reply)
+ end,
+ {Reason, _} = DD,
+ {stop, Reason, State};
false -> noreply(State)
end.
@@ -1119,7 +1131,7 @@ handle_call({deliver, Delivery = #delivery{mandatory = true}}, From, State) ->
gen_server2:reply(From, true),
noreply(deliver_or_enqueue(Delivery, State));
-handle_call({notify_down, ChPid}, _From, State) ->
+handle_call({notify_down, ChPid}, From, State) ->
%% we want to do this synchronously, so that auto_deleted queues
%% are no longer visible by the time we send a response to the
%% client. The queue is ultimately deleted in terminate/2; if we
@@ -1127,7 +1139,7 @@ handle_call({notify_down, ChPid}, _From, State) ->
%% gen_server2 *before* the reply is sent.
case handle_ch_down(ChPid, State) of
{ok, State1} -> reply(ok, State1);
- {stop, State1} -> {stop, normal, ok, State1}
+ {stop, State1} -> stop_later(normal, From, ok, State1)
end;
handle_call({basic_get, ChPid, NoAck}, _From,
@@ -1182,7 +1194,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
reply(ok, State2)
end;
-handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
+handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
State = #q{exclusive_consumer = Holder}) ->
ok = maybe_send_reply(ChPid, OkMsg),
case lookup_ch(ChPid) of
@@ -1202,7 +1214,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
State#q.active_consumers)},
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
- true -> {stop, normal, ok, State1}
+ true -> stop_later(normal, From, ok, State1)
end
end;
@@ -1211,14 +1223,15 @@ handle_call(stat, _From, State) ->
drop_expired_messages(ensure_expiry_timer(State)),
reply({ok, BQ:len(BQS), active_consumer_count()}, State1);
-handle_call({delete, IfUnused, IfEmpty}, _From,
+handle_call({delete, IfUnused, IfEmpty}, From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
IsEmpty = BQ:is_empty(BQS),
IsUnused = is_unused(State),
if
IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State);
IfUnused and not(IsUnused) -> reply({error, in_use}, State);
- true -> {stop, normal, {ok, BQ:len(BQS)}, State}
+ true -> stop_later(normal, From,
+ {ok, BQ:len(BQS)}, State)
end;
handle_call(purge, _From, State = #q{backing_queue = BQ,
@@ -1280,7 +1293,7 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
end));
handle_cast(delete_immediately, State) ->
- {stop, normal, State};
+ stop_later(normal, State);
handle_cast({unblock, ChPid}, State) ->
noreply(
@@ -1341,7 +1354,7 @@ handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) ->
handle_info(maybe_expire, State) ->
case is_unused(State) of
- true -> {stop, normal, State};
+ true -> stop_later(normal, State);
false -> noreply(ensure_expiry_timer(State))
end;
@@ -1363,11 +1376,11 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
%% match what people expect (see bug 21824). However we need this
%% monitor-and-async- delete in case the connection goes away
%% unexpectedly.
- {stop, normal, State};
+ stop_later(normal, State);
handle_info({'DOWN', _MonitorRef, process, DownPid, Reason}, State) ->
case handle_ch_down(DownPid, State) of
{ok, State1} -> handle_queue_down(DownPid, Reason, State1);
- {stop, State1} -> {stop, normal, State1}
+ {stop, State1} -> stop_later(normal, State1)
end;
handle_info(update_ram_duration, State = #q{backing_queue = BQ,
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f2bf3481e0..f84de829b9 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1160,7 +1160,7 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)},
{Nack, SendFun} =
case rabbit_misc:is_abnormal_termination(Reason) of
- true -> {true, fun send_nacks/2};
+ true -> {true, fun send_nacks/2};
false -> {false, fun record_confirms/2}
end,
{MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1),
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 226470a53c..6d8bed83b8 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -404,16 +404,11 @@ filter_exit_map(F, L) ->
fun () -> Ref end,
fun () -> F(I) end) || I <- L]).
-is_abnormal_termination(Reason) ->
- case Reason of
- Reason when Reason =:= noproc; Reason =:= noconnection;
- Reason =:= normal; Reason =:= shutdown ->
- false;
- {shutdown, _} ->
- false;
- _ ->
- true
- end.
+is_abnormal_termination(Reason)
+ when Reason =:= noproc; Reason =:= noconnection;
+ Reason =:= normal; Reason =:= shutdown -> false;
+is_abnormal_termination({shutdown, _}) -> false;
+is_abnormal_termination(_) -> true.
with_user(Username, Thunk) ->
fun () ->