diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2012-03-23 11:26:10 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-03-23 11:26:10 +0000 |
| commit | bd4b6cd7f5f84c19f125acc2eb8f04ba0f8cd730 (patch) | |
| tree | 24554fdd7f8d99b1a40799ffac3ffe18f57c2653 /src | |
| parent | d7b3c452ff5f2f0989de3cb5e330b660f3a7643b (diff) | |
| download | rabbitmq-server-git-bd4b6cd7f5f84c19f125acc2eb8f04ba0f8cd730.tar.gz | |
use pmon in queue_collector
this required a tweak to the amqqueue API - making
delete_immediately/1 take a list of qpids rather than a single
#amqqueue record.
Diffstat (limited to 'src')
| -rw-r--r-- | src/pmon.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_queue_collector.erl | 36 |
4 files changed, 25 insertions, 27 deletions
diff --git a/src/pmon.erl b/src/pmon.erl index 61c1657839..457865774b 100644 --- a/src/pmon.erl +++ b/src/pmon.erl @@ -17,7 +17,7 @@ -module(pmon). -export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2, - to_list/1, is_empty/1]). + monitored/1, is_empty/1]). -ifdef(use_specs). @@ -33,7 +33,7 @@ -spec(demonitor/2 :: (pid(), ?MODULE()) -> ?MODULE()). -spec(is_monitored/2 :: (pid(), ?MODULE()) -> boolean()). -spec(erase/2 :: (pid(), ?MODULE()) -> ?MODULE()). --spec(to_list/1 :: (?MODULE()) -> [{pid(), reference()}]). +-spec(monitored/1 :: (?MODULE()) -> [pid()]). -spec(is_empty/1 :: (?MODULE()) -> boolean()). -endif. @@ -59,6 +59,6 @@ is_monitored(Pid, M) -> dict:is_key(Pid, M). erase(Pid, M) -> dict:erase(Pid, M). -to_list(M) -> dict:to_list(M). +monitored(M) -> dict:fetch_keys(M). is_empty(M) -> dict:size(M) == 0. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 9b6f14ca06..4f06630b90 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -109,7 +109,7 @@ -spec(stat/1 :: (rabbit_types:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}). --spec(delete_immediately/1 :: (rabbit_types:amqqueue()) -> 'ok'). +-spec(delete_immediately/1 :: (qpids()) -> 'ok'). -spec(delete/3 :: (rabbit_types:amqqueue(), 'false', 'false') -> qlen(); @@ -462,8 +462,8 @@ consumers_all(VHostPath) -> stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat). -delete_immediately(#amqqueue{ pid = QPid }) -> - gen_server2:cast(QPid, delete_immediately). +delete_immediately(QPids) -> + [gen_server2:cast(QPid, delete_immediately) || QPid <- QPids]. delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> delegate_call(QPid, {delete, IfUnused, IfEmpty}). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 0a9643cf4a..081af7667e 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -974,7 +974,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, {error, not_found} -> case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args, Owner) of - {new, Q = #amqqueue{}} -> + {new, #amqqueue{pid = QPid}} -> %% We need to notify the reader within the channel %% process so that we can be sure there are no %% outstanding exclusive queues being declared as @@ -982,7 +982,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, ok = case Owner of none -> ok; _ -> rabbit_queue_collector:register( - CollectorPid, Q) + CollectorPid, QPid) end, return_queue_declare_ok(QueueName, NoWait, 0, 0, State); {existing, _Q} -> diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl index df957d883c..6ebde2aab5 100644 --- a/src/rabbit_queue_collector.erl +++ b/src/rabbit_queue_collector.erl @@ -23,7 +23,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {queues, delete_from}). +-record(state, {monitors, delete_from}). -include("rabbit.hrl"). @@ -32,7 +32,7 @@ -ifdef(use_specs). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(register/2 :: (pid(), rabbit_types:amqqueue()) -> 'ok'). +-spec(register/2 :: (pid(), pid()) -> 'ok'). -spec(delete_all/1 :: (pid()) -> 'ok'). -endif. @@ -51,39 +51,37 @@ delete_all(CollectorPid) -> %%---------------------------------------------------------------------------- init([]) -> - {ok, #state{queues = dict:new(), delete_from = undefined}}. + {ok, #state{monitors = pmon:new(), delete_from = undefined}}. %%-------------------------------------------------------------------------- -handle_call({register, Q}, _From, - State = #state{queues = Queues, delete_from = Deleting}) -> - MonitorRef = erlang:monitor(process, Q#amqqueue.pid), +handle_call({register, QPid}, _From, + State = #state{monitors = QMons, delete_from = Deleting}) -> case Deleting of undefined -> ok; - _ -> rabbit_amqqueue:delete_immediately(Q) + _ -> rabbit_amqqueue:delete_immediately(QPid) end, - {reply, ok, State#state{queues = dict:store(MonitorRef, Q, Queues)}}; + {reply, ok, State#state{monitors = pmon:monitor(QPid, QMons)}}; -handle_call(delete_all, From, State = #state{queues = Queues, +handle_call(delete_all, From, State = #state{monitors = QMons, delete_from = undefined}) -> - case dict:size(Queues) of - 0 -> {reply, ok, State#state{delete_from = From}}; - _ -> [rabbit_amqqueue:delete_immediately(Q) - || {_MRef, Q} <- dict:to_list(Queues)], - {noreply, State#state{delete_from = From}} + case pmon:monitored(QMons) of + [] -> {reply, ok, State#state{delete_from = From}}; + QPids -> rabbit_amqqueue:delete_immediately(QPids), + {noreply, State#state{delete_from = From}} end. handle_cast(Msg, State) -> {stop, {unhandled_cast, Msg}, State}. -handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason}, - State = #state{queues = Queues, delete_from = Deleting}) -> - Queues1 = dict:erase(MonitorRef, Queues), - case Deleting =/= undefined andalso dict:size(Queues1) =:= 0 of +handle_info({'DOWN', _MRef, process, DownPid, _Reason}, + State = #state{monitors = QMons, delete_from = Deleting}) -> + QMons1 = pmon:erase(DownPid, QMons), + case Deleting =/= undefined andalso pmon:is_empty(QMons1) of true -> gen_server:reply(Deleting, ok); false -> ok end, - {noreply, State#state{queues = Queues1}}. + {noreply, State#state{monitors = QMons1}}. terminate(_Reason, _State) -> ok. |
