diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2012-03-23 14:11:47 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-03-23 14:11:47 +0000 |
| commit | 8f25204f353b651e6b856b3aaba6dd4fef3b6fc8 (patch) | |
| tree | 1aff5f5efa71c9f5b7581dabb05f38031efd4f29 /src | |
| parent | ea0439d1886f187ab7d1593cc5dfebf72753b96c (diff) | |
| download | rabbitmq-server-git-8f25204f353b651e6b856b3aaba6dd4fef3b6fc8.tar.gz | |
monitor queue senders with pmon
...which simplifies the pmon API and the mirror slave promotion
Diffstat (limited to 'src')
| -rw-r--r-- | src/pmon.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 53 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 14 |
3 files changed, 33 insertions, 39 deletions
diff --git a/src/pmon.erl b/src/pmon.erl index 171386a0fd..457865774b 100644 --- a/src/pmon.erl +++ b/src/pmon.erl @@ -17,7 +17,7 @@ -module(pmon). -export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2, - monitored/1, to_list/1, is_empty/1]). + monitored/1, is_empty/1]). -ifdef(use_specs). @@ -34,7 +34,6 @@ -spec(is_monitored/2 :: (pid(), ?MODULE()) -> boolean()). -spec(erase/2 :: (pid(), ?MODULE()) -> ?MODULE()). -spec(monitored/1 :: (?MODULE()) -> [pid()]). --spec(to_list/1 :: (?MODULE()) -> [{pid(), reference()}]). -spec(is_empty/1 :: (?MODULE()) -> boolean()). -endif. @@ -62,6 +61,4 @@ erase(Pid, M) -> dict:erase(Pid, M). monitored(M) -> dict:fetch_keys(M). -to_list(M) -> dict:to_list(M). - is_empty(M) -> dict:size(M) == 0. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 456fc77c21..606848ee58 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -29,7 +29,7 @@ -export([start_link/1, info_keys/0]). --export([init_with_backing_queue_state/7]). +-export([init_with_backing_queue_state/8]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, @@ -50,6 +50,7 @@ msg_id_to_channel, ttl, ttl_timer_ref, + senders, publish_seqno, unconfirmed_mq, unconfirmed_qm, @@ -78,9 +79,9 @@ -spec(start_link/1 :: (rabbit_types:amqqueue()) -> rabbit_types:ok_pid_or_error()). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). --spec(init_with_backing_queue_state/7 :: +-spec(init_with_backing_queue_state/8 :: (rabbit_types:amqqueue(), atom(), tuple(), any(), [any()], - [rabbit_types:delivery()], dict()) -> #q{}). + [rabbit_types:delivery()], pmon:pmon(), dict()) -> #q{}). -endif. @@ -135,6 +136,7 @@ init(Q) -> rate_timer_ref = undefined, expiry_timer_ref = undefined, ttl = undefined, + senders = pmon:new(), dlx = undefined, dlx_routing_key = undefined, publish_seqno = 1, @@ -147,7 +149,7 @@ init(Q) -> {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, - RateTRef, AckTags, Deliveries, MTC) -> + RateTRef, AckTags, Deliveries, Senders, MTC) -> case Owner of none -> ok; _ -> erlang:monitor(process, Owner) @@ -163,6 +165,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, rate_timer_ref = RateTRef, expiry_timer_ref = undefined, ttl = undefined, + senders = Senders, publish_seqno = 1, unconfirmed_mq = gb_trees:empty(), unconfirmed_qm = gb_trees:empty(), @@ -619,16 +622,16 @@ should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; should_auto_delete(#q{has_had_consumers = false}) -> false; should_auto_delete(State) -> is_unused(State). -handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> - case get({ch_publisher, DownPid}) of - undefined -> ok; - MRef -> erlang:demonitor(MRef), - erase({ch_publisher, DownPid}), - credit_flow:peer_down(DownPid) - end, +handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, + senders = Senders}) -> + Senders1 = case pmon:is_monitored(DownPid, Senders) of + false -> Senders; + true -> ok = credit_flow:peer_down(DownPid), + pmon:demonitor(DownPid, Senders) + end, case lookup_ch(DownPid) of not_found -> - {ok, State}; + {ok, State#q{senders = Senders1}}; C = #cr{ch_pid = ChPid, acktags = ChAckTags, blocked_consumers = Blocked} -> @@ -640,7 +643,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> Other -> Other end, active_consumers = remove_consumers( - ChPid, State#q.active_consumers)}, + ChPid, State#q.active_consumers), + senders = Senders1}, case should_auto_delete(State1) of true -> {stop, State1}; false -> {ok, requeue_and_run(sets:to_list(ChAckTags), @@ -1240,22 +1244,19 @@ handle_cast({run_backing_queue, Mod, Fun}, State) -> handle_cast({deliver, Delivery = #delivery{sender = Sender, msg_seq_no = MsgSeqNo}, Flow}, - State) -> + State = #q{senders = Senders}) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - case Flow of - flow -> Key = {ch_publisher, Sender}, - case get(Key) of - undefined -> put(Key, erlang:monitor(process, Sender)); - _ -> ok - end, - credit_flow:ack(Sender); - noflow -> ok - end, - case already_been_here(Delivery, State) of - false -> noreply(deliver_or_enqueue(Delivery, State)); + Senders1 = case Flow of + flow -> ok = credit_flow:ack(Sender), + pmon:monitor(Sender, Senders); + noflow -> ok + end, + State1 = State#q{senders = Senders1}, + case already_been_here(Delivery, State1) of + false -> noreply(deliver_or_enqueue(Delivery, State1)); Qs -> log_cycle_once(Qs), rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]), - noreply(State) + noreply(State1) end; handle_cast({ack, AckTags, ChPid}, State) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 354ff7bc1c..f0dff3548a 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -286,7 +286,7 @@ terminate(Reason, #state { q = Q, rate_timer_ref = RateTRef }) -> ok = gm:leave(GM), QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( - Q, BQ, BQS, RateTRef, [], [], dict:new()), + Q, BQ, BQS, RateTRef, [], [], pmon:new(), dict:new()), rabbit_amqqueue_process:terminate(Reason, QueueState); terminate([_SPid], _Reason) -> %% gm case @@ -459,12 +459,8 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, %% Everything that we're monitoring, we need to ensure our new %% coordinator is monitoring. - - MonitoringPids = [begin put({ch_publisher, Pid}, MRef), - Pid - end || {Pid, MRef} <- pmon:to_list(KS)], - ok = rabbit_mirror_queue_coordinator:ensure_monitoring( - CPid, MonitoringPids), + MPids = pmon:monitored(KS), + ok = rabbit_mirror_queue_coordinator:ensure_monitoring(CPid, MPids), %% We find all the messages that we've received from channels but %% not from gm, and if they're due to be enqueued on promotion @@ -537,7 +533,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, Status =:= published orelse Status =:= confirmed]), MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( - CPid, BQ, BQS, GM, SS, MonitoringPids), + CPid, BQ, BQS, GM, SS, MPids), MTC = lists:foldl(fun ({MsgId, {published, ChPid, MsgSeqNo}}, MTC0) -> gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0); @@ -550,7 +546,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, {Delivery, true} <- queue:to_list(PubQ)], QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( Q1, rabbit_mirror_queue_master, MasterState, RateTRef, - AckTags, Deliveries, MTC), + AckTags, Deliveries, KS, MTC), {become, rabbit_amqqueue_process, QueueState, hibernate}. noreply(State) -> |
