summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-03-23 14:11:47 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-03-23 14:11:47 +0000
commit8f25204f353b651e6b856b3aaba6dd4fef3b6fc8 (patch)
tree1aff5f5efa71c9f5b7581dabb05f38031efd4f29 /src
parentea0439d1886f187ab7d1593cc5dfebf72753b96c (diff)
downloadrabbitmq-server-git-8f25204f353b651e6b856b3aaba6dd4fef3b6fc8.tar.gz
monitor queue senders with pmon
...which simplifies the pmon API and the mirror slave promotion
Diffstat (limited to 'src')
-rw-r--r--src/pmon.erl5
-rw-r--r--src/rabbit_amqqueue_process.erl53
-rw-r--r--src/rabbit_mirror_queue_slave.erl14
3 files changed, 33 insertions, 39 deletions
diff --git a/src/pmon.erl b/src/pmon.erl
index 171386a0fd..457865774b 100644
--- a/src/pmon.erl
+++ b/src/pmon.erl
@@ -17,7 +17,7 @@
-module(pmon).
-export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2,
- monitored/1, to_list/1, is_empty/1]).
+ monitored/1, is_empty/1]).
-ifdef(use_specs).
@@ -34,7 +34,6 @@
-spec(is_monitored/2 :: (pid(), ?MODULE()) -> boolean()).
-spec(erase/2 :: (pid(), ?MODULE()) -> ?MODULE()).
-spec(monitored/1 :: (?MODULE()) -> [pid()]).
--spec(to_list/1 :: (?MODULE()) -> [{pid(), reference()}]).
-spec(is_empty/1 :: (?MODULE()) -> boolean()).
-endif.
@@ -62,6 +61,4 @@ erase(Pid, M) -> dict:erase(Pid, M).
monitored(M) -> dict:fetch_keys(M).
-to_list(M) -> dict:to_list(M).
-
is_empty(M) -> dict:size(M) == 0.
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 456fc77c21..606848ee58 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -29,7 +29,7 @@
-export([start_link/1, info_keys/0]).
--export([init_with_backing_queue_state/7]).
+-export([init_with_backing_queue_state/8]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
@@ -50,6 +50,7 @@
msg_id_to_channel,
ttl,
ttl_timer_ref,
+ senders,
publish_seqno,
unconfirmed_mq,
unconfirmed_qm,
@@ -78,9 +79,9 @@
-spec(start_link/1 ::
(rabbit_types:amqqueue()) -> rabbit_types:ok_pid_or_error()).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
--spec(init_with_backing_queue_state/7 ::
+-spec(init_with_backing_queue_state/8 ::
(rabbit_types:amqqueue(), atom(), tuple(), any(), [any()],
- [rabbit_types:delivery()], dict()) -> #q{}).
+ [rabbit_types:delivery()], pmon:pmon(), dict()) -> #q{}).
-endif.
@@ -135,6 +136,7 @@ init(Q) ->
rate_timer_ref = undefined,
expiry_timer_ref = undefined,
ttl = undefined,
+ senders = pmon:new(),
dlx = undefined,
dlx_routing_key = undefined,
publish_seqno = 1,
@@ -147,7 +149,7 @@ init(Q) ->
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
- RateTRef, AckTags, Deliveries, MTC) ->
+ RateTRef, AckTags, Deliveries, Senders, MTC) ->
case Owner of
none -> ok;
_ -> erlang:monitor(process, Owner)
@@ -163,6 +165,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
rate_timer_ref = RateTRef,
expiry_timer_ref = undefined,
ttl = undefined,
+ senders = Senders,
publish_seqno = 1,
unconfirmed_mq = gb_trees:empty(),
unconfirmed_qm = gb_trees:empty(),
@@ -619,16 +622,16 @@ should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
should_auto_delete(#q{has_had_consumers = false}) -> false;
should_auto_delete(State) -> is_unused(State).
-handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
- case get({ch_publisher, DownPid}) of
- undefined -> ok;
- MRef -> erlang:demonitor(MRef),
- erase({ch_publisher, DownPid}),
- credit_flow:peer_down(DownPid)
- end,
+handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
+ senders = Senders}) ->
+ Senders1 = case pmon:is_monitored(DownPid, Senders) of
+ false -> Senders;
+ true -> ok = credit_flow:peer_down(DownPid),
+ pmon:demonitor(DownPid, Senders)
+ end,
case lookup_ch(DownPid) of
not_found ->
- {ok, State};
+ {ok, State#q{senders = Senders1}};
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
blocked_consumers = Blocked} ->
@@ -640,7 +643,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
Other -> Other
end,
active_consumers = remove_consumers(
- ChPid, State#q.active_consumers)},
+ ChPid, State#q.active_consumers),
+ senders = Senders1},
case should_auto_delete(State1) of
true -> {stop, State1};
false -> {ok, requeue_and_run(sets:to_list(ChAckTags),
@@ -1240,22 +1244,19 @@ handle_cast({run_backing_queue, Mod, Fun}, State) ->
handle_cast({deliver, Delivery = #delivery{sender = Sender,
msg_seq_no = MsgSeqNo}, Flow},
- State) ->
+ State = #q{senders = Senders}) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
- case Flow of
- flow -> Key = {ch_publisher, Sender},
- case get(Key) of
- undefined -> put(Key, erlang:monitor(process, Sender));
- _ -> ok
- end,
- credit_flow:ack(Sender);
- noflow -> ok
- end,
- case already_been_here(Delivery, State) of
- false -> noreply(deliver_or_enqueue(Delivery, State));
+ Senders1 = case Flow of
+ flow -> ok = credit_flow:ack(Sender),
+ pmon:monitor(Sender, Senders);
+ noflow -> ok
+ end,
+ State1 = State#q{senders = Senders1},
+ case already_been_here(Delivery, State1) of
+ false -> noreply(deliver_or_enqueue(Delivery, State1));
Qs -> log_cycle_once(Qs),
rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]),
- noreply(State)
+ noreply(State1)
end;
handle_cast({ack, AckTags, ChPid}, State) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 354ff7bc1c..f0dff3548a 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -286,7 +286,7 @@ terminate(Reason, #state { q = Q,
rate_timer_ref = RateTRef }) ->
ok = gm:leave(GM),
QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
- Q, BQ, BQS, RateTRef, [], [], dict:new()),
+ Q, BQ, BQS, RateTRef, [], [], pmon:new(), dict:new()),
rabbit_amqqueue_process:terminate(Reason, QueueState);
terminate([_SPid], _Reason) ->
%% gm case
@@ -459,12 +459,8 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
%% Everything that we're monitoring, we need to ensure our new
%% coordinator is monitoring.
-
- MonitoringPids = [begin put({ch_publisher, Pid}, MRef),
- Pid
- end || {Pid, MRef} <- pmon:to_list(KS)],
- ok = rabbit_mirror_queue_coordinator:ensure_monitoring(
- CPid, MonitoringPids),
+ MPids = pmon:monitored(KS),
+ ok = rabbit_mirror_queue_coordinator:ensure_monitoring(CPid, MPids),
%% We find all the messages that we've received from channels but
%% not from gm, and if they're due to be enqueued on promotion
@@ -537,7 +533,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
Status =:= published orelse Status =:= confirmed]),
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
- CPid, BQ, BQS, GM, SS, MonitoringPids),
+ CPid, BQ, BQS, GM, SS, MPids),
MTC = lists:foldl(fun ({MsgId, {published, ChPid, MsgSeqNo}}, MTC0) ->
gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
@@ -550,7 +546,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
{Delivery, true} <- queue:to_list(PubQ)],
QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
Q1, rabbit_mirror_queue_master, MasterState, RateTRef,
- AckTags, Deliveries, MTC),
+ AckTags, Deliveries, KS, MTC),
{become, rabbit_amqqueue_process, QueueState, hibernate}.
noreply(State) ->