diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2012-02-17 11:01:33 +0000 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2012-02-17 11:01:33 +0000 |
| commit | 22f3580833dd1c46a678ec3b87d1fdde8a8aaf6c (patch) | |
| tree | 2692d5a62590c73115135a5d6b9f58f324131039 /src | |
| parent | f84d75d6c7db92d973bd724294ad278867c22bb0 (diff) | |
| download | rabbitmq-server-git-22f3580833dd1c46a678ec3b87d1fdde8a8aaf6c.tar.gz | |
delay stopping the queue until all confirms have been received
The only two cases we don't delay are when 1) there are no outstanding
confirms and 2) if the queue is blowing up (e.g. because it's received an
'EXIT').
Also, small refactor for rabbit_misc:is_abnormal_termination/1.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 57 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 15 |
3 files changed, 41 insertions, 33 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 166a9576aa..5305935b49 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -53,7 +53,7 @@ publish_seqno, unconfirmed_mq, unconfirmed_qm, - blocked_ops, + delayed_delete, queue_monitors, dlx, dlx_routing_key @@ -140,7 +140,7 @@ init(Q) -> publish_seqno = 1, unconfirmed_mq = gb_trees:empty(), unconfirmed_qm = gb_trees:empty(), - blocked_ops = [], + delayed_delete = undefined, queue_monitors = dict:new(), msg_id_to_channel = gb_trees:empty()}, {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate, @@ -166,7 +166,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, publish_seqno = 1, unconfirmed_mq = gb_trees:empty(), unconfirmed_qm = gb_trees:empty(), - blocked_ops = [], + delayed_delete = undefined, queue_monitors = dict:new(), msg_id_to_channel = MTC}, State1 = requeue_and_run(AckTags, process_args( @@ -835,16 +835,28 @@ handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ, cleanup_after_confirm(State1#q{unconfirmed_mq = UMQ3, backing_queue_state = BQS1}). -cleanup_after_confirm(State = #q{blocked_ops = Ops, +stop_later(Reason, State) -> + stop_later(Reason, undefined, noreply, State). + +stop_later(Reason, From, Reply, State = #q{unconfirmed_mq = UMQ}) -> + case {gb_trees:is_empty(UMQ), Reply} of + {true, noreply} -> + {stop, Reason, State}; + {true, _} -> + {stop, Reason, Reply, State}; + {false, _} -> + noreply(State#q{delayed_delete = {Reason, {From, Reply}}}) + end. + +cleanup_after_confirm(State = #q{delayed_delete = DD, unconfirmed_mq = UMQ}) -> - case gb_trees:is_empty(UMQ) andalso Ops =/= [] of - true -> [gen_server2:reply(From, {ok, Count}) || - {_, {From, Count}} <- Ops, From =/= undefined], - State1 = State#q{blocked_ops = []}, - case lists:any(fun({Rsn, _}) -> Rsn =:= delete end, Ops) of - true -> {stop, normal, State1}; - false -> noreply(State1) - end; + case gb_trees:is_empty(UMQ) andalso DD =/= undefined of + true -> case DD of + {_, noreply} -> ok; + {_, {From, Reply}} -> gen_server2:reply(From, Reply) + end, + {Reason, _} = DD, + {stop, Reason, State}; false -> noreply(State) end. @@ -1119,7 +1131,7 @@ handle_call({deliver, Delivery = #delivery{mandatory = true}}, From, State) -> gen_server2:reply(From, true), noreply(deliver_or_enqueue(Delivery, State)); -handle_call({notify_down, ChPid}, _From, State) -> +handle_call({notify_down, ChPid}, From, State) -> %% we want to do this synchronously, so that auto_deleted queues %% are no longer visible by the time we send a response to the %% client. The queue is ultimately deleted in terminate/2; if we @@ -1127,7 +1139,7 @@ handle_call({notify_down, ChPid}, _From, State) -> %% gen_server2 *before* the reply is sent. case handle_ch_down(ChPid, State) of {ok, State1} -> reply(ok, State1); - {stop, State1} -> {stop, normal, ok, State1} + {stop, State1} -> stop_later(normal, From, ok, State1) end; handle_call({basic_get, ChPid, NoAck}, _From, @@ -1182,7 +1194,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, reply(ok, State2) end; -handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, +handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, State = #q{exclusive_consumer = Holder}) -> ok = maybe_send_reply(ChPid, OkMsg), case lookup_ch(ChPid) of @@ -1202,7 +1214,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, State#q.active_consumers)}, case should_auto_delete(State1) of false -> reply(ok, ensure_expiry_timer(State1)); - true -> {stop, normal, ok, State1} + true -> stop_later(normal, From, ok, State1) end end; @@ -1211,14 +1223,15 @@ handle_call(stat, _From, State) -> drop_expired_messages(ensure_expiry_timer(State)), reply({ok, BQ:len(BQS), active_consumer_count()}, State1); -handle_call({delete, IfUnused, IfEmpty}, _From, +handle_call({delete, IfUnused, IfEmpty}, From, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> IsEmpty = BQ:is_empty(BQS), IsUnused = is_unused(State), if IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State); IfUnused and not(IsUnused) -> reply({error, in_use}, State); - true -> {stop, normal, {ok, BQ:len(BQS)}, State} + true -> stop_later(normal, From, + {ok, BQ:len(BQS)}, State) end; handle_call(purge, _From, State = #q{backing_queue = BQ, @@ -1280,7 +1293,7 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) -> end)); handle_cast(delete_immediately, State) -> - {stop, normal, State}; + stop_later(normal, State); handle_cast({unblock, ChPid}, State) -> noreply( @@ -1341,7 +1354,7 @@ handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) -> handle_info(maybe_expire, State) -> case is_unused(State) of - true -> {stop, normal, State}; + true -> stop_later(normal, State); false -> noreply(ensure_expiry_timer(State)) end; @@ -1363,11 +1376,11 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, %% match what people expect (see bug 21824). However we need this %% monitor-and-async- delete in case the connection goes away %% unexpectedly. - {stop, normal, State}; + stop_later(normal, State); handle_info({'DOWN', _MonitorRef, process, DownPid, Reason}, State) -> case handle_ch_down(DownPid, State) of {ok, State1} -> handle_queue_down(DownPid, Reason, State1); - {stop, State1} -> {stop, normal, State1} + {stop, State1} -> stop_later(normal, State1) end; handle_info(update_ram_duration, State = #q{backing_queue = BQ, diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f2bf3481e0..f84de829b9 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1160,7 +1160,7 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)}, {Nack, SendFun} = case rabbit_misc:is_abnormal_termination(Reason) of - true -> {true, fun send_nacks/2}; + true -> {true, fun send_nacks/2}; false -> {false, fun record_confirms/2} end, {MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1), diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 226470a53c..6d8bed83b8 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -404,16 +404,11 @@ filter_exit_map(F, L) -> fun () -> Ref end, fun () -> F(I) end) || I <- L]). -is_abnormal_termination(Reason) -> - case Reason of - Reason when Reason =:= noproc; Reason =:= noconnection; - Reason =:= normal; Reason =:= shutdown -> - false; - {shutdown, _} -> - false; - _ -> - true - end. +is_abnormal_termination(Reason) + when Reason =:= noproc; Reason =:= noconnection; + Reason =:= normal; Reason =:= shutdown -> false; +is_abnormal_termination({shutdown, _}) -> false; +is_abnormal_termination(_) -> true. with_user(Username, Thunk) -> fun () -> |
