summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2012-02-17 11:01:33 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2012-02-17 11:01:33 +0000
commit22f3580833dd1c46a678ec3b87d1fdde8a8aaf6c (patch)
tree2692d5a62590c73115135a5d6b9f58f324131039 /src
parentf84d75d6c7db92d973bd724294ad278867c22bb0 (diff)
downloadrabbitmq-server-git-22f3580833dd1c46a678ec3b87d1fdde8a8aaf6c.tar.gz
delay stopping the queue until all confirms have been received
The only two cases we don't delay are when 1) there are no outstanding confirms and 2) if the queue is blowing up (e.g. because it's received an 'EXIT'). Also, small refactor for rabbit_misc:is_abnormal_termination/1.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl57
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_misc.erl15
3 files changed, 41 insertions, 33 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 166a9576aa..5305935b49 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -53,7 +53,7 @@
publish_seqno,
unconfirmed_mq,
unconfirmed_qm,
- blocked_ops,
+ delayed_delete,
queue_monitors,
dlx,
dlx_routing_key
@@ -140,7 +140,7 @@ init(Q) ->
publish_seqno = 1,
unconfirmed_mq = gb_trees:empty(),
unconfirmed_qm = gb_trees:empty(),
- blocked_ops = [],
+ delayed_delete = undefined,
queue_monitors = dict:new(),
msg_id_to_channel = gb_trees:empty()},
{ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate,
@@ -166,7 +166,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
publish_seqno = 1,
unconfirmed_mq = gb_trees:empty(),
unconfirmed_qm = gb_trees:empty(),
- blocked_ops = [],
+ delayed_delete = undefined,
queue_monitors = dict:new(),
msg_id_to_channel = MTC},
State1 = requeue_and_run(AckTags, process_args(
@@ -835,16 +835,28 @@ handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ,
cleanup_after_confirm(State1#q{unconfirmed_mq = UMQ3,
backing_queue_state = BQS1}).
-cleanup_after_confirm(State = #q{blocked_ops = Ops,
+stop_later(Reason, State) ->
+ stop_later(Reason, undefined, noreply, State).
+
+stop_later(Reason, From, Reply, State = #q{unconfirmed_mq = UMQ}) ->
+ case {gb_trees:is_empty(UMQ), Reply} of
+ {true, noreply} ->
+ {stop, Reason, State};
+ {true, _} ->
+ {stop, Reason, Reply, State};
+ {false, _} ->
+ noreply(State#q{delayed_delete = {Reason, {From, Reply}}})
+ end.
+
+cleanup_after_confirm(State = #q{delayed_delete = DD,
unconfirmed_mq = UMQ}) ->
- case gb_trees:is_empty(UMQ) andalso Ops =/= [] of
- true -> [gen_server2:reply(From, {ok, Count}) ||
- {_, {From, Count}} <- Ops, From =/= undefined],
- State1 = State#q{blocked_ops = []},
- case lists:any(fun({Rsn, _}) -> Rsn =:= delete end, Ops) of
- true -> {stop, normal, State1};
- false -> noreply(State1)
- end;
+ case gb_trees:is_empty(UMQ) andalso DD =/= undefined of
+ true -> case DD of
+ {_, noreply} -> ok;
+ {_, {From, Reply}} -> gen_server2:reply(From, Reply)
+ end,
+ {Reason, _} = DD,
+ {stop, Reason, State};
false -> noreply(State)
end.
@@ -1119,7 +1131,7 @@ handle_call({deliver, Delivery = #delivery{mandatory = true}}, From, State) ->
gen_server2:reply(From, true),
noreply(deliver_or_enqueue(Delivery, State));
-handle_call({notify_down, ChPid}, _From, State) ->
+handle_call({notify_down, ChPid}, From, State) ->
%% we want to do this synchronously, so that auto_deleted queues
%% are no longer visible by the time we send a response to the
%% client. The queue is ultimately deleted in terminate/2; if we
@@ -1127,7 +1139,7 @@ handle_call({notify_down, ChPid}, _From, State) ->
%% gen_server2 *before* the reply is sent.
case handle_ch_down(ChPid, State) of
{ok, State1} -> reply(ok, State1);
- {stop, State1} -> {stop, normal, ok, State1}
+ {stop, State1} -> stop_later(normal, From, ok, State1)
end;
handle_call({basic_get, ChPid, NoAck}, _From,
@@ -1182,7 +1194,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
reply(ok, State2)
end;
-handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
+handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
State = #q{exclusive_consumer = Holder}) ->
ok = maybe_send_reply(ChPid, OkMsg),
case lookup_ch(ChPid) of
@@ -1202,7 +1214,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
State#q.active_consumers)},
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
- true -> {stop, normal, ok, State1}
+ true -> stop_later(normal, From, ok, State1)
end
end;
@@ -1211,14 +1223,15 @@ handle_call(stat, _From, State) ->
drop_expired_messages(ensure_expiry_timer(State)),
reply({ok, BQ:len(BQS), active_consumer_count()}, State1);
-handle_call({delete, IfUnused, IfEmpty}, _From,
+handle_call({delete, IfUnused, IfEmpty}, From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
IsEmpty = BQ:is_empty(BQS),
IsUnused = is_unused(State),
if
IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State);
IfUnused and not(IsUnused) -> reply({error, in_use}, State);
- true -> {stop, normal, {ok, BQ:len(BQS)}, State}
+ true -> stop_later(normal, From,
+ {ok, BQ:len(BQS)}, State)
end;
handle_call(purge, _From, State = #q{backing_queue = BQ,
@@ -1280,7 +1293,7 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
end));
handle_cast(delete_immediately, State) ->
- {stop, normal, State};
+ stop_later(normal, State);
handle_cast({unblock, ChPid}, State) ->
noreply(
@@ -1341,7 +1354,7 @@ handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) ->
handle_info(maybe_expire, State) ->
case is_unused(State) of
- true -> {stop, normal, State};
+ true -> stop_later(normal, State);
false -> noreply(ensure_expiry_timer(State))
end;
@@ -1363,11 +1376,11 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
%% match what people expect (see bug 21824). However we need this
%% monitor-and-async- delete in case the connection goes away
%% unexpectedly.
- {stop, normal, State};
+ stop_later(normal, State);
handle_info({'DOWN', _MonitorRef, process, DownPid, Reason}, State) ->
case handle_ch_down(DownPid, State) of
{ok, State1} -> handle_queue_down(DownPid, Reason, State1);
- {stop, State1} -> {stop, normal, State1}
+ {stop, State1} -> stop_later(normal, State1)
end;
handle_info(update_ram_duration, State = #q{backing_queue = BQ,
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f2bf3481e0..f84de829b9 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1160,7 +1160,7 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)},
{Nack, SendFun} =
case rabbit_misc:is_abnormal_termination(Reason) of
- true -> {true, fun send_nacks/2};
+ true -> {true, fun send_nacks/2};
false -> {false, fun record_confirms/2}
end,
{MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1),
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 226470a53c..6d8bed83b8 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -404,16 +404,11 @@ filter_exit_map(F, L) ->
fun () -> Ref end,
fun () -> F(I) end) || I <- L]).
-is_abnormal_termination(Reason) ->
- case Reason of
- Reason when Reason =:= noproc; Reason =:= noconnection;
- Reason =:= normal; Reason =:= shutdown ->
- false;
- {shutdown, _} ->
- false;
- _ ->
- true
- end.
+is_abnormal_termination(Reason)
+ when Reason =:= noproc; Reason =:= noconnection;
+ Reason =:= normal; Reason =:= shutdown -> false;
+is_abnormal_termination({shutdown, _}) -> false;
+is_abnormal_termination(_) -> true.
with_user(Username, Thunk) ->
fun () ->