diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2012-03-23 13:10:21 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-03-23 13:10:21 +0000 |
| commit | ea0439d1886f187ab7d1593cc5dfebf72753b96c (patch) | |
| tree | 9d9141f858c8b167264ab886119df7aa44ee0063 /src | |
| parent | 714023885da25ac727c58bc6346b89330f80b7ee (diff) | |
| download | rabbitmq-server-git-ea0439d1886f187ab7d1593cc5dfebf72753b96c.tar.gz | |
use pmon in mirror_queue_slave
Diffstat (limited to 'src')
| -rw-r--r-- | src/pmon.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 41 |
2 files changed, 21 insertions, 25 deletions
diff --git a/src/pmon.erl b/src/pmon.erl index 457865774b..171386a0fd 100644 --- a/src/pmon.erl +++ b/src/pmon.erl @@ -17,7 +17,7 @@ -module(pmon). -export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2, - monitored/1, is_empty/1]). + monitored/1, to_list/1, is_empty/1]). -ifdef(use_specs). @@ -34,6 +34,7 @@ -spec(is_monitored/2 :: (pid(), ?MODULE()) -> boolean()). -spec(erase/2 :: (pid(), ?MODULE()) -> ?MODULE()). -spec(monitored/1 :: (?MODULE()) -> [pid()]). +-spec(to_list/1 :: (?MODULE()) -> [{pid(), reference()}]). -spec(is_empty/1 :: (?MODULE()) -> boolean()). -endif. @@ -61,4 +62,6 @@ erase(Pid, M) -> dict:erase(Pid, M). monitored(M) -> dict:fetch_keys(M). +to_list(M) -> dict:to_list(M). + is_empty(M) -> dict:size(M) == 0. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 98a80a2619..354ff7bc1c 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -140,7 +140,7 @@ init(#amqqueue { name = QueueName } = Q) -> ack_num = 0, msg_id_status = dict:new(), - known_senders = dict:new(), + known_senders = pmon:new(), synchronised = false }, @@ -462,7 +462,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, MonitoringPids = [begin put({ch_publisher, Pid}, MRef), Pid - end || {Pid, MRef} <- dict:to_list(KS)], + end || {Pid, MRef} <- pmon:to_list(KS)], ok = rabbit_mirror_queue_coordinator:ensure_monitoring( CPid, MonitoringPids), @@ -605,14 +605,10 @@ stop_rate_timer(State = #state { rate_timer_ref = TRef }) -> State #state { rate_timer_ref = undefined }. ensure_monitoring(ChPid, State = #state { known_senders = KS }) -> - case dict:is_key(ChPid, KS) of - true -> State; - false -> MRef = erlang:monitor(process, ChPid), - State #state { known_senders = dict:store(ChPid, MRef, KS) } - end. + State #state { known_senders = pmon:monitor(ChPid, KS) }. local_sender_death(ChPid, State = #state { known_senders = KS }) -> - ok = case dict:is_key(ChPid, KS) of + ok = case pmon:is_monitored(ChPid, KS) of false -> ok; true -> credit_flow:peer_down(ChPid), confirm_sender_death(ChPid) @@ -628,7 +624,7 @@ confirm_sender_death(Pid) -> fun (?MODULE, State = #state { known_senders = KS, gm = GM }) -> %% We're running still as a slave - ok = case dict:is_key(Pid, KS) of + ok = case pmon:is_monitored(KS) of false -> ok; true -> gm:broadcast(GM, {ensure_monitoring, [Pid]}), confirm_sender_death(Pid) @@ -871,21 +867,18 @@ process_instruction({sender_death, ChPid}, State = #state { sender_queues = SQ, msg_id_status = MS, known_senders = KS }) -> - {ok, case dict:find(ChPid, KS) of - error -> - State; - {ok, MRef} -> - true = erlang:demonitor(MRef), - MS1 = case dict:find(ChPid, SQ) of - error -> - MS; - {ok, {_MQ, PendingCh}} -> - lists:foldl(fun dict:erase/2, MS, - sets:to_list(PendingCh)) - end, - State #state { sender_queues = dict:erase(ChPid, SQ), - msg_id_status = MS1, - known_senders = dict:erase(ChPid, KS) } + {ok, case pmon:is_monitored(ChPid, KS) of + false -> State; + true -> MS1 = case dict:find(ChPid, SQ) of + error -> + MS; + {ok, {_MQ, PendingCh}} -> + lists:foldl(fun dict:erase/2, MS, + sets:to_list(PendingCh)) + end, + State #state { sender_queues = dict:erase(ChPid, SQ), + msg_id_status = MS1, + known_senders = pmon:demonitor(ChPid, KS) } end}; process_instruction({length, Length}, State = #state { backing_queue = BQ, |
