summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-10-09 22:38:08 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-10-09 22:38:08 +0100
commit4dcba9cdbd04df45a562b36c01949bf72e69d838 (patch)
tree3ee50f6e1170e4123ef17b163be2f09e6afd8907 /src
parent9c60db6b5eaf9d036d89f70c4a1b9c041e81ab30 (diff)
downloadrabbitmq-server-git-4dcba9cdbd04df45a562b36c01949bf72e69d838.tar.gz
delete exclusive queues in parallel
Delete_exclusive is only ever used by the queue collector, which discards its reply, so turn it into a cast. Since exclusive queue deletion is now asynchronous, just fire off all the queue deletions at once and gather their DOWN messages.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl2
-rw-r--r--src/rabbit_amqqueue_process.erl14
-rw-r--r--src/rabbit_queue_collector.erl30
3 files changed, 28 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 94d3808b25..30d80da047 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -363,7 +363,7 @@ emit_stats(#amqqueue{pid = QPid}) ->
delegate_cast(QPid, emit_stats).
delete_exclusive(#amqqueue{ pid = QPid }) ->
- gen_server2:call(QPid, delete_exclusive, infinity).
+ gen_server2:cast(QPid, delete_exclusive).
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 61204deb6c..40605d4202 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -594,7 +594,6 @@ prioritise_call(Msg, _From, _State) ->
info -> 9;
{info, _Items} -> 9;
consumers -> 9;
- delete_exclusive -> 8;
{maybe_run_queue_via_backing_queue, _Fun} -> 6;
_ -> 0
end.
@@ -602,6 +601,7 @@ prioritise_call(Msg, _From, _State) ->
prioritise_cast(Msg, _State) ->
case Msg of
update_ram_duration -> 8;
+ delete_exclusive -> 8;
{set_ram_duration_target, _Duration} -> 8;
{set_maximum_since_use, _Age} -> 8;
maybe_expire -> 8;
@@ -787,13 +787,6 @@ handle_call(stat, _From, State = #q{backing_queue = BQ,
reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)},
ensure_expiry_timer(State));
-handle_call(delete_exclusive, _From,
- State = #q{ backing_queue_state = BQS,
- backing_queue = BQ,
- q = #amqqueue{exclusive_owner = Owner}
- }) when Owner =/= none ->
- {stop, normal, {ok, BQ:len(BQS)}, State};
-
handle_call(delete_exclusive, _From, State) ->
reply({error, not_exclusive}, State);
@@ -868,6 +861,11 @@ handle_cast({reject, AckTags, Requeue, ChPid},
handle_cast({rollback, Txn, ChPid}, State) ->
noreply(rollback_transaction(Txn, ChPid, State));
+handle_cast(delete_exclusive,
+ State = #q{ q = #amqqueue{exclusive_owner = Owner}})
+ when Owner =/= none ->
+ {stop, normal, State};
+
handle_cast({unblock, ChPid}, State) ->
noreply(
possibly_unblock(State, ChPid,
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index 0b8efc8f83..f980844267 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -77,24 +77,36 @@ handle_call({register, Q}, _From,
State#state{queues = dict:store(MonitorRef, Q, Queues)}};
handle_call(delete_all, _From, State = #state{queues = Queues}) ->
+ Qs = dict:to_list(Queues),
[rabbit_misc:with_exit_handler(
fun () -> ok end,
- fun () ->
- erlang:demonitor(MonitorRef),
- rabbit_amqqueue:delete_exclusive(Q)
- end)
- || {MonitorRef, Q} <- dict:to_list(Queues)],
- {reply, ok, State}.
+ fun () -> rabbit_amqqueue:delete_exclusive(Q) end)
+ || {_MRef, Q} <- Qs],
+ {reply, ok, wait_DOWNs(gb_sets:from_list([MRef || {MRef, _Q} <- Qs]),
+ State)}.
handle_cast(Msg, State) ->
{stop, {unhandled_cast, Msg}, State}.
-handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason},
- State = #state{queues = Queues}) ->
- {noreply, State#state{queues = dict:erase(MonitorRef, Queues)}}.
+handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason}, State) ->
+ {noreply, erase_queue(MonitorRef, State)}.
terminate(_Reason, _State) ->
+ rabbit_log:info("collector terminated~n"),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+
+wait_DOWNs(MRefs, State) ->
+ case gb_sets:is_empty(MRefs) of
+ true -> State;
+ false -> receive
+ {'DOWN', MRef, process, _DownPid, _Reason} ->
+ wait_DOWNs(gb_sets:del_element(MRef, MRefs),
+ erase_queue(MRef, State))
+ end
+ end.
+
+erase_queue(MRef, State = #state{queues = Queues}) ->
+ State#state{queues = dict:erase(MRef, Queues)}.