diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 57 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 15 |
3 files changed, 41 insertions, 33 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 166a9576aa..5305935b49 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -53,7 +53,7 @@ publish_seqno, unconfirmed_mq, unconfirmed_qm, - blocked_ops, + delayed_delete, queue_monitors, dlx, dlx_routing_key @@ -140,7 +140,7 @@ init(Q) -> publish_seqno = 1, unconfirmed_mq = gb_trees:empty(), unconfirmed_qm = gb_trees:empty(), - blocked_ops = [], + delayed_delete = undefined, queue_monitors = dict:new(), msg_id_to_channel = gb_trees:empty()}, {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate, @@ -166,7 +166,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, publish_seqno = 1, unconfirmed_mq = gb_trees:empty(), unconfirmed_qm = gb_trees:empty(), - blocked_ops = [], + delayed_delete = undefined, queue_monitors = dict:new(), msg_id_to_channel = MTC}, State1 = requeue_and_run(AckTags, process_args( @@ -835,16 +835,28 @@ handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ, cleanup_after_confirm(State1#q{unconfirmed_mq = UMQ3, backing_queue_state = BQS1}). -cleanup_after_confirm(State = #q{blocked_ops = Ops, +stop_later(Reason, State) -> + stop_later(Reason, undefined, noreply, State). + +stop_later(Reason, From, Reply, State = #q{unconfirmed_mq = UMQ}) -> + case {gb_trees:is_empty(UMQ), Reply} of + {true, noreply} -> + {stop, Reason, State}; + {true, _} -> + {stop, Reason, Reply, State}; + {false, _} -> + noreply(State#q{delayed_delete = {Reason, {From, Reply}}}) + end. + +cleanup_after_confirm(State = #q{delayed_delete = DD, unconfirmed_mq = UMQ}) -> - case gb_trees:is_empty(UMQ) andalso Ops =/= [] of - true -> [gen_server2:reply(From, {ok, Count}) || - {_, {From, Count}} <- Ops, From =/= undefined], - State1 = State#q{blocked_ops = []}, - case lists:any(fun({Rsn, _}) -> Rsn =:= delete end, Ops) of - true -> {stop, normal, State1}; - false -> noreply(State1) - end; + case gb_trees:is_empty(UMQ) andalso DD =/= undefined of + true -> case DD of + {_, noreply} -> ok; + {_, {From, Reply}} -> gen_server2:reply(From, Reply) + end, + {Reason, _} = DD, + {stop, Reason, State}; false -> noreply(State) end. @@ -1119,7 +1131,7 @@ handle_call({deliver, Delivery = #delivery{mandatory = true}}, From, State) -> gen_server2:reply(From, true), noreply(deliver_or_enqueue(Delivery, State)); -handle_call({notify_down, ChPid}, _From, State) -> +handle_call({notify_down, ChPid}, From, State) -> %% we want to do this synchronously, so that auto_deleted queues %% are no longer visible by the time we send a response to the %% client. The queue is ultimately deleted in terminate/2; if we @@ -1127,7 +1139,7 @@ handle_call({notify_down, ChPid}, _From, State) -> %% gen_server2 *before* the reply is sent. case handle_ch_down(ChPid, State) of {ok, State1} -> reply(ok, State1); - {stop, State1} -> {stop, normal, ok, State1} + {stop, State1} -> stop_later(normal, From, ok, State1) end; handle_call({basic_get, ChPid, NoAck}, _From, @@ -1182,7 +1194,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, reply(ok, State2) end; -handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, +handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, State = #q{exclusive_consumer = Holder}) -> ok = maybe_send_reply(ChPid, OkMsg), case lookup_ch(ChPid) of @@ -1202,7 +1214,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, State#q.active_consumers)}, case should_auto_delete(State1) of false -> reply(ok, ensure_expiry_timer(State1)); - true -> {stop, normal, ok, State1} + true -> stop_later(normal, From, ok, State1) end end; @@ -1211,14 +1223,15 @@ handle_call(stat, _From, State) -> drop_expired_messages(ensure_expiry_timer(State)), reply({ok, BQ:len(BQS), active_consumer_count()}, State1); -handle_call({delete, IfUnused, IfEmpty}, _From, +handle_call({delete, IfUnused, IfEmpty}, From, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> IsEmpty = BQ:is_empty(BQS), IsUnused = is_unused(State), if IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State); IfUnused and not(IsUnused) -> reply({error, in_use}, State); - true -> {stop, normal, {ok, BQ:len(BQS)}, State} + true -> stop_later(normal, From, + {ok, BQ:len(BQS)}, State) end; handle_call(purge, _From, State = #q{backing_queue = BQ, @@ -1280,7 +1293,7 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) -> end)); handle_cast(delete_immediately, State) -> - {stop, normal, State}; + stop_later(normal, State); handle_cast({unblock, ChPid}, State) -> noreply( @@ -1341,7 +1354,7 @@ handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) -> handle_info(maybe_expire, State) -> case is_unused(State) of - true -> {stop, normal, State}; + true -> stop_later(normal, State); false -> noreply(ensure_expiry_timer(State)) end; @@ -1363,11 +1376,11 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, %% match what people expect (see bug 21824). However we need this %% monitor-and-async- delete in case the connection goes away %% unexpectedly. - {stop, normal, State}; + stop_later(normal, State); handle_info({'DOWN', _MonitorRef, process, DownPid, Reason}, State) -> case handle_ch_down(DownPid, State) of {ok, State1} -> handle_queue_down(DownPid, Reason, State1); - {stop, State1} -> {stop, normal, State1} + {stop, State1} -> stop_later(normal, State1) end; handle_info(update_ram_duration, State = #q{backing_queue = BQ, diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f2bf3481e0..f84de829b9 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1160,7 +1160,7 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)}, {Nack, SendFun} = case rabbit_misc:is_abnormal_termination(Reason) of - true -> {true, fun send_nacks/2}; + true -> {true, fun send_nacks/2}; false -> {false, fun record_confirms/2} end, {MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1), diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 226470a53c..6d8bed83b8 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -404,16 +404,11 @@ filter_exit_map(F, L) -> fun () -> Ref end, fun () -> F(I) end) || I <- L]). -is_abnormal_termination(Reason) -> - case Reason of - Reason when Reason =:= noproc; Reason =:= noconnection; - Reason =:= normal; Reason =:= shutdown -> - false; - {shutdown, _} -> - false; - _ -> - true - end. +is_abnormal_termination(Reason) + when Reason =:= noproc; Reason =:= noconnection; + Reason =:= normal; Reason =:= shutdown -> false; +is_abnormal_termination({shutdown, _}) -> false; +is_abnormal_termination(_) -> true. with_user(Username, Thunk) -> fun () -> |
