diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-10-09 22:38:08 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-10-09 22:38:08 +0100 |
| commit | 4dcba9cdbd04df45a562b36c01949bf72e69d838 (patch) | |
| tree | 3ee50f6e1170e4123ef17b163be2f09e6afd8907 /src | |
| parent | 9c60db6b5eaf9d036d89f70c4a1b9c041e81ab30 (diff) | |
| download | rabbitmq-server-git-4dcba9cdbd04df45a562b36c01949bf72e69d838.tar.gz | |
delete exclusive queues in parallel
Delete_exclusive is only ever used by the queue collector, which
discards its reply, so turn it into a cast.
Since exclusive queue deletion is now asynchronous, just fire off all
the queue deletions at once and gather their DOWN messages.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_queue_collector.erl | 30 |
3 files changed, 28 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 94d3808b25..30d80da047 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -363,7 +363,7 @@ emit_stats(#amqqueue{pid = QPid}) -> delegate_cast(QPid, emit_stats). delete_exclusive(#amqqueue{ pid = QPid }) -> - gen_server2:call(QPid, delete_exclusive, infinity). + gen_server2:cast(QPid, delete_exclusive). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 61204deb6c..40605d4202 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -594,7 +594,6 @@ prioritise_call(Msg, _From, _State) -> info -> 9; {info, _Items} -> 9; consumers -> 9; - delete_exclusive -> 8; {maybe_run_queue_via_backing_queue, _Fun} -> 6; _ -> 0 end. @@ -602,6 +601,7 @@ prioritise_call(Msg, _From, _State) -> prioritise_cast(Msg, _State) -> case Msg of update_ram_duration -> 8; + delete_exclusive -> 8; {set_ram_duration_target, _Duration} -> 8; {set_maximum_since_use, _Age} -> 8; maybe_expire -> 8; @@ -787,13 +787,6 @@ handle_call(stat, _From, State = #q{backing_queue = BQ, reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, ensure_expiry_timer(State)); -handle_call(delete_exclusive, _From, - State = #q{ backing_queue_state = BQS, - backing_queue = BQ, - q = #amqqueue{exclusive_owner = Owner} - }) when Owner =/= none -> - {stop, normal, {ok, BQ:len(BQS)}, State}; - handle_call(delete_exclusive, _From, State) -> reply({error, not_exclusive}, State); @@ -868,6 +861,11 @@ handle_cast({reject, AckTags, Requeue, ChPid}, handle_cast({rollback, Txn, ChPid}, State) -> noreply(rollback_transaction(Txn, ChPid, State)); +handle_cast(delete_exclusive, + State = #q{ q = #amqqueue{exclusive_owner = Owner}}) + when Owner =/= none -> + {stop, normal, State}; + handle_cast({unblock, ChPid}, State) -> noreply( possibly_unblock(State, ChPid, diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl index 0b8efc8f83..f980844267 100644 --- a/src/rabbit_queue_collector.erl +++ b/src/rabbit_queue_collector.erl @@ -77,24 +77,36 @@ handle_call({register, Q}, _From, State#state{queues = dict:store(MonitorRef, Q, Queues)}}; handle_call(delete_all, _From, State = #state{queues = Queues}) -> + Qs = dict:to_list(Queues), [rabbit_misc:with_exit_handler( fun () -> ok end, - fun () -> - erlang:demonitor(MonitorRef), - rabbit_amqqueue:delete_exclusive(Q) - end) - || {MonitorRef, Q} <- dict:to_list(Queues)], - {reply, ok, State}. + fun () -> rabbit_amqqueue:delete_exclusive(Q) end) + || {_MRef, Q} <- Qs], + {reply, ok, wait_DOWNs(gb_sets:from_list([MRef || {MRef, _Q} <- Qs]), + State)}. handle_cast(Msg, State) -> {stop, {unhandled_cast, Msg}, State}. -handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason}, - State = #state{queues = Queues}) -> - {noreply, State#state{queues = dict:erase(MonitorRef, Queues)}}. +handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason}, State) -> + {noreply, erase_queue(MonitorRef, State)}. terminate(_Reason, _State) -> + rabbit_log:info("collector terminated~n"), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. + +wait_DOWNs(MRefs, State) -> + case gb_sets:is_empty(MRefs) of + true -> State; + false -> receive + {'DOWN', MRef, process, _DownPid, _Reason} -> + wait_DOWNs(gb_sets:del_element(MRef, MRefs), + erase_queue(MRef, State)) + end + end. + +erase_queue(MRef, State = #state{queues = Queues}) -> + State#state{queues = dict:erase(MRef, Queues)}. |
