summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/pmon.erl5
-rw-r--r--src/rabbit_mirror_queue_slave.erl41
2 files changed, 21 insertions, 25 deletions
diff --git a/src/pmon.erl b/src/pmon.erl
index 457865774b..171386a0fd 100644
--- a/src/pmon.erl
+++ b/src/pmon.erl
@@ -17,7 +17,7 @@
-module(pmon).
-export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2,
- monitored/1, is_empty/1]).
+ monitored/1, to_list/1, is_empty/1]).
-ifdef(use_specs).
@@ -34,6 +34,7 @@
-spec(is_monitored/2 :: (pid(), ?MODULE()) -> boolean()).
-spec(erase/2 :: (pid(), ?MODULE()) -> ?MODULE()).
-spec(monitored/1 :: (?MODULE()) -> [pid()]).
+-spec(to_list/1 :: (?MODULE()) -> [{pid(), reference()}]).
-spec(is_empty/1 :: (?MODULE()) -> boolean()).
-endif.
@@ -61,4 +62,6 @@ erase(Pid, M) -> dict:erase(Pid, M).
monitored(M) -> dict:fetch_keys(M).
+to_list(M) -> dict:to_list(M).
+
is_empty(M) -> dict:size(M) == 0.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 98a80a2619..354ff7bc1c 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -140,7 +140,7 @@ init(#amqqueue { name = QueueName } = Q) ->
ack_num = 0,
msg_id_status = dict:new(),
- known_senders = dict:new(),
+ known_senders = pmon:new(),
synchronised = false
},
@@ -462,7 +462,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
MonitoringPids = [begin put({ch_publisher, Pid}, MRef),
Pid
- end || {Pid, MRef} <- dict:to_list(KS)],
+ end || {Pid, MRef} <- pmon:to_list(KS)],
ok = rabbit_mirror_queue_coordinator:ensure_monitoring(
CPid, MonitoringPids),
@@ -605,14 +605,10 @@ stop_rate_timer(State = #state { rate_timer_ref = TRef }) ->
State #state { rate_timer_ref = undefined }.
ensure_monitoring(ChPid, State = #state { known_senders = KS }) ->
- case dict:is_key(ChPid, KS) of
- true -> State;
- false -> MRef = erlang:monitor(process, ChPid),
- State #state { known_senders = dict:store(ChPid, MRef, KS) }
- end.
+ State #state { known_senders = pmon:monitor(ChPid, KS) }.
local_sender_death(ChPid, State = #state { known_senders = KS }) ->
- ok = case dict:is_key(ChPid, KS) of
+ ok = case pmon:is_monitored(ChPid, KS) of
false -> ok;
true -> credit_flow:peer_down(ChPid),
confirm_sender_death(ChPid)
@@ -628,7 +624,7 @@ confirm_sender_death(Pid) ->
fun (?MODULE, State = #state { known_senders = KS,
gm = GM }) ->
%% We're running still as a slave
- ok = case dict:is_key(Pid, KS) of
+ ok = case pmon:is_monitored(KS) of
false -> ok;
true -> gm:broadcast(GM, {ensure_monitoring, [Pid]}),
confirm_sender_death(Pid)
@@ -871,21 +867,18 @@ process_instruction({sender_death, ChPid},
State = #state { sender_queues = SQ,
msg_id_status = MS,
known_senders = KS }) ->
- {ok, case dict:find(ChPid, KS) of
- error ->
- State;
- {ok, MRef} ->
- true = erlang:demonitor(MRef),
- MS1 = case dict:find(ChPid, SQ) of
- error ->
- MS;
- {ok, {_MQ, PendingCh}} ->
- lists:foldl(fun dict:erase/2, MS,
- sets:to_list(PendingCh))
- end,
- State #state { sender_queues = dict:erase(ChPid, SQ),
- msg_id_status = MS1,
- known_senders = dict:erase(ChPid, KS) }
+ {ok, case pmon:is_monitored(ChPid, KS) of
+ false -> State;
+ true -> MS1 = case dict:find(ChPid, SQ) of
+ error ->
+ MS;
+ {ok, {_MQ, PendingCh}} ->
+ lists:foldl(fun dict:erase/2, MS,
+ sets:to_list(PendingCh))
+ end,
+ State #state { sender_queues = dict:erase(ChPid, SQ),
+ msg_id_status = MS1,
+ known_senders = pmon:demonitor(ChPid, KS) }
end};
process_instruction({length, Length},
State = #state { backing_queue = BQ,