summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2012-02-17 12:06:35 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2012-02-17 12:06:35 +0000
commit5a8da9113aaf35ecf4c599c14f51a63f84f0911a (patch)
tree0961059e041054f57abd1bb15617155e4bd149ed
parent8cb2906111da6eace1c2686ed8ec4573b2c992e2 (diff)
downloadrabbitmq-server-git-5a8da9113aaf35ecf4c599c14f51a63f84f0911a.tar.gz
ignore all messages after queue deletion, except for confirms
Downside is that `rabbitmqctl list_queues` will hang until the dying queues stop completly.
-rw-r--r--src/rabbit_amqqueue_process.erl24
1 files changed, 14 insertions, 10 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 0395244b57..c357861160 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -838,8 +838,7 @@ handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ,
stop_later(Reason, State) ->
stop_later(Reason, undefined, noreply, State).
-stop_later(Reason, From, Reply, State = #q{unconfirmed_mq = UMQ,
- delayed_delete = undefined}) ->
+stop_later(Reason, From, Reply, State = #q{unconfirmed_mq = UMQ}) ->
case {gb_trees:is_empty(UMQ), Reply} of
{true, noreply} ->
{stop, Reason, State};
@@ -847,11 +846,7 @@ stop_later(Reason, From, Reply, State = #q{unconfirmed_mq = UMQ,
{stop, Reason, Reply, State};
{false, _} ->
noreply(State#q{delayed_delete = {Reason, {From, Reply}}})
- end;
-stop_later(_, _, _, State) ->
- %% All subsequent attempts to stop a stopping queue will hang; the
- %% caller will eventually receive a 'noproc'.
- noreply(State).
+ end.
cleanup_after_confirm(State = #q{delayed_delete = DD,
unconfirmed_mq = UMQ}) ->
@@ -1080,6 +1075,9 @@ prioritise_info(Msg, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
_ -> 0
end.
+handle_call(_, _, State = #q{delayed_delete = DD}) when DD =/= undefined ->
+ noreply(State);
+
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = none}}) ->
declare(Recover, From, State);
@@ -1250,6 +1248,12 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
ChPid, AckTags, State,
fun (State1) -> requeue_and_run(AckTags, State1) end)).
+handle_cast({confirm, MsgSeqNos, QPid}, State) ->
+ handle_confirm(MsgSeqNos, QPid, State);
+
+handle_cast(_, State = #q{delayed_delete = DD}) when DD =/= undefined ->
+ noreply(State);
+
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
@@ -1351,12 +1355,12 @@ handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) ->
end,
noreply(State);
-handle_cast({confirm, MsgSeqNos, QPid}, State) ->
- handle_confirm(MsgSeqNos, QPid, State);
-
handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) ->
dead_letter_msg(Msg, AckTag, Reason, State).
+handle_info(_, State = #q{delayed_delete = DD}) when DD =/= undefined ->
+ noreply(State);
+
handle_info(maybe_expire, State) ->
case is_unused(State) of
true -> stop_later(normal, State);