summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-03-23 11:26:10 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-03-23 11:26:10 +0000
commitbd4b6cd7f5f84c19f125acc2eb8f04ba0f8cd730 (patch)
tree24554fdd7f8d99b1a40799ffac3ffe18f57c2653 /src
parentd7b3c452ff5f2f0989de3cb5e330b660f3a7643b (diff)
downloadrabbitmq-server-git-bd4b6cd7f5f84c19f125acc2eb8f04ba0f8cd730.tar.gz
use pmon in queue_collector
this required a tweak to the amqqueue API - making delete_immediately/1 take a list of qpids rather than a single #amqqueue record.
Diffstat (limited to 'src')
-rw-r--r--src/pmon.erl6
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_channel.erl4
-rw-r--r--src/rabbit_queue_collector.erl36
4 files changed, 25 insertions, 27 deletions
diff --git a/src/pmon.erl b/src/pmon.erl
index 61c1657839..457865774b 100644
--- a/src/pmon.erl
+++ b/src/pmon.erl
@@ -17,7 +17,7 @@
-module(pmon).
-export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2,
- to_list/1, is_empty/1]).
+ monitored/1, is_empty/1]).
-ifdef(use_specs).
@@ -33,7 +33,7 @@
-spec(demonitor/2 :: (pid(), ?MODULE()) -> ?MODULE()).
-spec(is_monitored/2 :: (pid(), ?MODULE()) -> boolean()).
-spec(erase/2 :: (pid(), ?MODULE()) -> ?MODULE()).
--spec(to_list/1 :: (?MODULE()) -> [{pid(), reference()}]).
+-spec(monitored/1 :: (?MODULE()) -> [pid()]).
-spec(is_empty/1 :: (?MODULE()) -> boolean()).
-endif.
@@ -59,6 +59,6 @@ is_monitored(Pid, M) -> dict:is_key(Pid, M).
erase(Pid, M) -> dict:erase(Pid, M).
-to_list(M) -> dict:to_list(M).
+monitored(M) -> dict:fetch_keys(M).
is_empty(M) -> dict:size(M) == 0.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 9b6f14ca06..4f06630b90 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -109,7 +109,7 @@
-spec(stat/1 ::
(rabbit_types:amqqueue())
-> {'ok', non_neg_integer(), non_neg_integer()}).
--spec(delete_immediately/1 :: (rabbit_types:amqqueue()) -> 'ok').
+-spec(delete_immediately/1 :: (qpids()) -> 'ok').
-spec(delete/3 ::
(rabbit_types:amqqueue(), 'false', 'false')
-> qlen();
@@ -462,8 +462,8 @@ consumers_all(VHostPath) ->
stat(#amqqueue{pid = QPid}) ->
delegate_call(QPid, stat).
-delete_immediately(#amqqueue{ pid = QPid }) ->
- gen_server2:cast(QPid, delete_immediately).
+delete_immediately(QPids) ->
+ [gen_server2:cast(QPid, delete_immediately) || QPid <- QPids].
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
delegate_call(QPid, {delete, IfUnused, IfEmpty}).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 0a9643cf4a..081af7667e 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -974,7 +974,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
{error, not_found} ->
case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
Args, Owner) of
- {new, Q = #amqqueue{}} ->
+ {new, #amqqueue{pid = QPid}} ->
%% We need to notify the reader within the channel
%% process so that we can be sure there are no
%% outstanding exclusive queues being declared as
@@ -982,7 +982,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
ok = case Owner of
none -> ok;
_ -> rabbit_queue_collector:register(
- CollectorPid, Q)
+ CollectorPid, QPid)
end,
return_queue_declare_ok(QueueName, NoWait, 0, 0, State);
{existing, _Q} ->
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index df957d883c..6ebde2aab5 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -23,7 +23,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--record(state, {queues, delete_from}).
+-record(state, {monitors, delete_from}).
-include("rabbit.hrl").
@@ -32,7 +32,7 @@
-ifdef(use_specs).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(register/2 :: (pid(), rabbit_types:amqqueue()) -> 'ok').
+-spec(register/2 :: (pid(), pid()) -> 'ok').
-spec(delete_all/1 :: (pid()) -> 'ok').
-endif.
@@ -51,39 +51,37 @@ delete_all(CollectorPid) ->
%%----------------------------------------------------------------------------
init([]) ->
- {ok, #state{queues = dict:new(), delete_from = undefined}}.
+ {ok, #state{monitors = pmon:new(), delete_from = undefined}}.
%%--------------------------------------------------------------------------
-handle_call({register, Q}, _From,
- State = #state{queues = Queues, delete_from = Deleting}) ->
- MonitorRef = erlang:monitor(process, Q#amqqueue.pid),
+handle_call({register, QPid}, _From,
+ State = #state{monitors = QMons, delete_from = Deleting}) ->
case Deleting of
undefined -> ok;
- _ -> rabbit_amqqueue:delete_immediately(Q)
+ _ -> rabbit_amqqueue:delete_immediately(QPid)
end,
- {reply, ok, State#state{queues = dict:store(MonitorRef, Q, Queues)}};
+ {reply, ok, State#state{monitors = pmon:monitor(QPid, QMons)}};
-handle_call(delete_all, From, State = #state{queues = Queues,
+handle_call(delete_all, From, State = #state{monitors = QMons,
delete_from = undefined}) ->
- case dict:size(Queues) of
- 0 -> {reply, ok, State#state{delete_from = From}};
- _ -> [rabbit_amqqueue:delete_immediately(Q)
- || {_MRef, Q} <- dict:to_list(Queues)],
- {noreply, State#state{delete_from = From}}
+ case pmon:monitored(QMons) of
+ [] -> {reply, ok, State#state{delete_from = From}};
+ QPids -> rabbit_amqqueue:delete_immediately(QPids),
+ {noreply, State#state{delete_from = From}}
end.
handle_cast(Msg, State) ->
{stop, {unhandled_cast, Msg}, State}.
-handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason},
- State = #state{queues = Queues, delete_from = Deleting}) ->
- Queues1 = dict:erase(MonitorRef, Queues),
- case Deleting =/= undefined andalso dict:size(Queues1) =:= 0 of
+handle_info({'DOWN', _MRef, process, DownPid, _Reason},
+ State = #state{monitors = QMons, delete_from = Deleting}) ->
+ QMons1 = pmon:erase(DownPid, QMons),
+ case Deleting =/= undefined andalso pmon:is_empty(QMons1) of
true -> gen_server:reply(Deleting, ok);
false -> ok
end,
- {noreply, State#state{queues = Queues1}}.
+ {noreply, State#state{monitors = QMons1}}.
terminate(_Reason, _State) ->
ok.