diff options
| -rw-r--r-- | src/pmon.erl | 64 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 30 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 143 | ||||
| -rw-r--r-- | src/rabbit_backing_queue_qc.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 21 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 55 | ||||
| -rw-r--r-- | src/rabbit_queue_collector.erl | 36 |
9 files changed, 218 insertions, 167 deletions
diff --git a/src/pmon.erl b/src/pmon.erl new file mode 100644 index 0000000000..457865774b --- /dev/null +++ b/src/pmon.erl @@ -0,0 +1,64 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. +%% + +-module(pmon). + +-export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2, + monitored/1, is_empty/1]). + +-ifdef(use_specs). + +%%---------------------------------------------------------------------------- + +-export_type([?MODULE/0]). + +-opaque(?MODULE() :: dict()). + +-spec(new/0 :: () -> ?MODULE()). +-spec(monitor/2 :: (pid(), ?MODULE()) -> ?MODULE()). +-spec(monitor_all/2 :: ([pid()], ?MODULE()) -> ?MODULE()). +-spec(demonitor/2 :: (pid(), ?MODULE()) -> ?MODULE()). +-spec(is_monitored/2 :: (pid(), ?MODULE()) -> boolean()). +-spec(erase/2 :: (pid(), ?MODULE()) -> ?MODULE()). +-spec(monitored/1 :: (?MODULE()) -> [pid()]). +-spec(is_empty/1 :: (?MODULE()) -> boolean()). + +-endif. + +new() -> dict:new(). + +monitor(Pid, M) -> + case dict:is_key(Pid, M) of + true -> M; + false -> dict:store(Pid, erlang:monitor(process, Pid), M) + end. + +monitor_all(Pids, M) -> lists:foldl(fun monitor/2, M, Pids). + +demonitor(Pid, M) -> + case dict:find(Pid, M) of + {ok, MRef} -> erlang:demonitor(MRef), + dict:erase(Pid, M); + error -> M + end. + +is_monitored(Pid, M) -> dict:is_key(Pid, M). + +erase(Pid, M) -> dict:erase(Pid, M). + +monitored(M) -> dict:fetch_keys(M). + +is_empty(M) -> dict:size(M) == 0. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 9ecbcbc339..c1673504e7 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -109,7 +109,7 @@ -spec(stat/1 :: (rabbit_types:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}). --spec(delete_immediately/1 :: (rabbit_types:amqqueue()) -> 'ok'). +-spec(delete_immediately/1 :: (qpids()) -> 'ok'). -spec(delete/3 :: (rabbit_types:amqqueue(), 'false', 'false') -> qlen(); @@ -331,7 +331,7 @@ assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args}, check_declare_arguments(QueueName, Args) -> Checks = [{<<"x-expires">>, fun check_positive_int_arg/2}, - {<<"x-message-ttl">>, fun check_positive_int_arg/2}, + {<<"x-message-ttl">>, fun check_non_neg_int_arg/2}, {<<"x-ha-policy">>, fun check_ha_policy_arg/2}, {<<"x-dead-letter-exchange">>, fun check_string_arg/2}, {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}], @@ -353,11 +353,24 @@ check_string_arg({longstr, _}, _Args) -> check_string_arg({Type, _}, _) -> {error, {unacceptable_type, Type}}. -check_positive_int_arg({Type, Val}, _Args) -> +check_int_arg({Type, _}, _) -> case lists:member(Type, ?INTEGER_ARG_TYPES) of - false -> {error, {unacceptable_type, Type}}; - true when Val =< 0 -> {error, {value_zero_or_less, Val}}; - true -> ok + true -> ok; + false -> {error, {unacceptable_type, Type}} + end. + +check_positive_int_arg({Type, Val}, Args) -> + case check_int_arg({Type, Val}, Args) of + ok when Val > 0 -> ok; + ok -> {error, {value_zero_or_less, Val}}; + Error -> Error + end. + +check_non_neg_int_arg({Type, Val}, Args) -> + case check_int_arg({Type, Val}, Args) of + ok when Val >= 0 -> ok; + ok -> {error, {value_less_than_zero, Val}}; + Error -> Error end. check_dlxrk_arg({longstr, _}, Args) -> @@ -455,8 +468,9 @@ consumers_all(VHostPath) -> stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat). -delete_immediately(#amqqueue{ pid = QPid }) -> - gen_server2:cast(QPid, delete_immediately). +delete_immediately(QPids) -> + [gen_server2:cast(QPid, delete_immediately) || QPid <- QPids], + ok. delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> delegate_call(QPid, {delete, IfUnused, IfEmpty}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index eb9ee8353a..3caf728ba0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -26,7 +26,7 @@ -export([start_link/1, info_keys/0]). --export([init_with_backing_queue_state/7]). +-export([init_with_backing_queue_state/8]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, @@ -47,6 +47,7 @@ msg_id_to_channel, ttl, ttl_timer_ref, + senders, publish_seqno, unconfirmed, delayed_stop, @@ -74,9 +75,9 @@ -spec(start_link/1 :: (rabbit_types:amqqueue()) -> rabbit_types:ok_pid_or_error()). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). --spec(init_with_backing_queue_state/7 :: +-spec(init_with_backing_queue_state/8 :: (rabbit_types:amqqueue(), atom(), tuple(), any(), [any()], - [rabbit_types:delivery()], dict()) -> #q{}). + [rabbit_types:delivery()], pmon:pmon(), dict()) -> #q{}). -endif. @@ -131,18 +132,19 @@ init(Q) -> rate_timer_ref = undefined, expiry_timer_ref = undefined, ttl = undefined, + senders = pmon:new(), dlx = undefined, dlx_routing_key = undefined, publish_seqno = 1, unconfirmed = dtree:empty(), delayed_stop = undefined, - queue_monitors = dict:new(), + queue_monitors = pmon:new(), msg_id_to_channel = gb_trees:empty()}, {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, - RateTRef, AckTags, Deliveries, MTC) -> + RateTRef, AckTags, Deliveries, Senders, MTC) -> case Owner of none -> ok; _ -> erlang:monitor(process, Owner) @@ -158,10 +160,11 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, rate_timer_ref = RateTRef, expiry_timer_ref = undefined, ttl = undefined, + senders = Senders, publish_seqno = 1, unconfirmed = dtree:empty(), delayed_stop = undefined, - queue_monitors = dict:new(), + queue_monitors = pmon:new(), msg_id_to_channel = MTC}, State1 = requeue_and_run(AckTags, process_args( rabbit_event:init_stats_timer( @@ -538,17 +541,25 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm, State#q{backing_queue_state = BQS1}} end. -deliver_or_enqueue(Delivery = #delivery{message = Message, - sender = SenderPid}, State) -> +deliver_or_enqueue(Delivery = #delivery{message = Message, + msg_seq_no = MsgSeqNo, + sender = SenderPid}, State) -> Confirm = should_confirm_message(Delivery, State), - {Delivered, State1} = attempt_delivery(Delivery, Confirm, State), - State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = - maybe_record_confirm_message(Confirm, State1), - case Delivered of - true -> State2; - false -> Props = message_properties(Confirm, State), - BQS1 = BQ:publish(Message, Props, SenderPid, BQS), - ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) + case attempt_delivery(Delivery, Confirm, State) of + {true, State1} -> + maybe_record_confirm_message(Confirm, State1); + %% the next two are optimisations + {false, State1 = #q{ttl = 0, dlx = undefined}} when Confirm == never -> + discard_delivery(Delivery, State1); + {false, State1 = #q{ttl = 0, dlx = undefined}} -> + rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]), + discard_delivery(Delivery, State1); + {false, State1} -> + State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = + maybe_record_confirm_message(Confirm, State1), + Props = message_properties(Confirm, State2), + BQS1 = BQ:publish(Message, Props, SenderPid, BQS), + ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) end. requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> @@ -597,16 +608,16 @@ should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; should_auto_delete(#q{has_had_consumers = false}) -> false; should_auto_delete(State) -> is_unused(State). -handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> - case get({ch_publisher, DownPid}) of - undefined -> ok; - MRef -> erlang:demonitor(MRef), - erase({ch_publisher, DownPid}), - credit_flow:peer_down(DownPid) - end, +handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, + senders = Senders}) -> + Senders1 = case pmon:is_monitored(DownPid, Senders) of + false -> Senders; + true -> credit_flow:peer_down(DownPid), + pmon:demonitor(DownPid, Senders) + end, case lookup_ch(DownPid) of not_found -> - {ok, State}; + {ok, State#q{senders = Senders1}}; C = #cr{ch_pid = ChPid, acktags = ChAckTags, blocked_consumers = Blocked} -> @@ -618,7 +629,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> Other -> Other end, active_consumers = remove_consumers( - ChPid, State#q.active_consumers)}, + ChPid, State#q.active_consumers), + senders = Senders1}, case should_auto_delete(State1) of true -> {stop, State1}; false -> {ok, requeue_and_run(sets:to_list(ChAckTags), @@ -728,47 +740,32 @@ dead_letter_msg_existing_dlx(Msg, AckTag, Reason, rabbit_basic:delivery( false, false, make_dead_letter_msg(DLX, Reason, Msg, State), MsgSeqNo)), - State1 = lists:foldl(fun monitor_queue/2, State, QPids), - State2 = State1#q{publish_seqno = MsgSeqNo + 1}, + State1 = State#q{queue_monitors = pmon:monitor_all( + QPids, State#q.queue_monitors), + publish_seqno = MsgSeqNo + 1}, case QPids of - [] -> cleanup_after_confirm([AckTag], State2); + [] -> cleanup_after_confirm([AckTag], State1); _ -> UC1 = dtree:insert(MsgSeqNo, QPids, AckTag, UC), - noreply(State2#q{unconfirmed = UC1}) - end. - -monitor_queue(QPid, State = #q{queue_monitors = QMons}) -> - case dict:is_key(QPid, QMons) of - true -> State; - false -> State#q{queue_monitors = - dict:store(QPid, erlang:monitor(process, QPid), - QMons)} - end. - -demonitor_queue(QPid, State = #q{queue_monitors = QMons}) -> - case dict:find(QPid, QMons) of - {ok, MRef} -> erlang:demonitor(MRef), - State#q{queue_monitors = dict:erase(QPid, QMons)}; - error -> State + noreply(State1#q{unconfirmed = UC1}) end. handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons, unconfirmed = UC}) -> - case dict:find(QPid, QMons) of - error -> - noreply(State); - {ok, _} -> - case rabbit_misc:is_abnormal_termination(Reason) of - true -> {Lost, _UC1} = dtree:take_all(QPid, UC), - rabbit_log:warning( - "DLQ ~p for ~s died with ~p unconfirmed messages~n", - [QPid, rabbit_misc:rs(qname(State)), length(Lost)]); - false -> ok - end, - {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC), - cleanup_after_confirm( - [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], - State#q{queue_monitors = dict:erase(QPid, QMons), - unconfirmed = UC1}) + case pmon:is_monitored(QPid, QMons) of + false -> noreply(State); + true -> case rabbit_misc:is_abnormal_termination(Reason) of + true -> {Lost, _UC1} = dtree:take_all(QPid, UC), + QNameS = rabbit_misc:rs(qname(State)), + rabbit_log:warning("DLQ ~p for ~s died with " + "~p unconfirmed messages~n", + [QPid, QNameS, length(Lost)]); + false -> ok + end, + {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC), + cleanup_after_confirm( + [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], + State#q{queue_monitors = pmon:erase(QPid, QMons), + unconfirmed = UC1}) end. stop_later(Reason, State) -> @@ -1186,7 +1183,8 @@ handle_call(force_event_refresh, _From, handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) -> {MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC), State1 = case dtree:is_defined(QPid, UC1) of - false -> demonitor_queue(QPid, State); + false -> QMons = State#q.queue_monitors, + State#q{queue_monitors = pmon:demonitor(QPid, QMons)}; true -> State end, cleanup_after_confirm([AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], @@ -1200,22 +1198,19 @@ handle_cast({run_backing_queue, Mod, Fun}, State) -> handle_cast({deliver, Delivery = #delivery{sender = Sender, msg_seq_no = MsgSeqNo}, Flow}, - State) -> + State = #q{senders = Senders}) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - case Flow of - flow -> Key = {ch_publisher, Sender}, - case get(Key) of - undefined -> put(Key, erlang:monitor(process, Sender)); - _ -> ok - end, - credit_flow:ack(Sender); - noflow -> ok - end, - case already_been_here(Delivery, State) of - false -> noreply(deliver_or_enqueue(Delivery, State)); + Senders1 = case Flow of + flow -> credit_flow:ack(Sender), + pmon:monitor(Sender, Senders); + noflow -> Senders + end, + State1 = State#q{senders = Senders1}, + case already_been_here(Delivery, State1) of + false -> noreply(deliver_or_enqueue(Delivery, State1)); Qs -> log_cycle_once(Qs), rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]), - noreply(State) + noreply(State1) end; handle_cast({ack, AckTags, ChPid}, State) -> diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index 7b00fa5f13..286b69e4ac 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -141,7 +141,7 @@ qc_drain_confirmed(#state{bqstate = BQ}) -> {call, ?BQMOD, drain_confirmed, [BQ]}. qc_dropwhile(#state{bqstate = BQ}) -> - {call, ?BQMOD, dropwhile, [fun dropfun/1, BQ]}. + {call, ?BQMOD, dropwhile, [fun dropfun/1, fun (_,_) -> ok end, BQ]}. qc_is_empty(#state{bqstate = BQ}) -> {call, ?BQMOD, is_empty, [BQ]}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 0c1c11d894..846890a145 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -194,7 +194,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, - queue_monitors = sets:new(), + queue_monitors = pmon:new(), consumer_mapping = dict:new(), blocking = sets:new(), queue_consumers = dict:new(), @@ -333,8 +333,8 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State3 = handle_consuming_queue_down(QPid, State2), credit_flow:peer_down(QPid), erase_queue_stats(QPid), - noreply(State3#ch{queue_monitors = - sets:del_element(QPid, State3#ch.queue_monitors)}); + noreply(State3#ch{queue_monitors = pmon:erase( + QPid, State3#ch.queue_monitors)}); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. @@ -758,9 +758,7 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, fun () -> {error, not_found} end, fun () -> rabbit_amqqueue:basic_cancel( - Q, self(), ConsumerTag, - ok_msg(NoWait, #'basic.cancel_ok'{ - consumer_tag = ConsumerTag})) + Q, self(), ConsumerTag, ok_msg(NoWait, OkMsg)) end) of ok -> {noreply, NewState}; @@ -937,7 +935,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, {error, not_found} -> case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args, Owner) of - {new, Q = #amqqueue{}} -> + {new, #amqqueue{pid = QPid}} -> %% We need to notify the reader within the channel %% process so that we can be sure there are no %% outstanding exclusive queues being declared as @@ -945,7 +943,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, ok = case Owner of none -> ok; _ -> rabbit_queue_collector:register( - CollectorPid, Q) + CollectorPid, QPid) end, return_queue_declare_ok(QueueName, NoWait, 0, 0, State); {existing, _Q} -> @@ -1091,6 +1089,7 @@ handle_method(_MethodRecord, _Content, _State) -> consumer_monitor(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping, + queue_monitors = QMons, queue_consumers = QCons, capabilities = Capabilities}) -> case rabbit_misc:table_lookup( @@ -1103,18 +1102,12 @@ consumer_monitor(ConsumerTag, end, gb_sets:singleton(ConsumerTag), QCons), - monitor_queue(QPid, State#ch{queue_consumers = QCons1}); + State#ch{queue_monitors = pmon:monitor(QPid, QMons), + queue_consumers = QCons1}; _ -> State end. -monitor_queue(QPid, State = #ch{queue_monitors = QMons}) -> - case sets:is_element(QPid, QMons) of - false -> erlang:monitor(process, QPid), - State#ch{queue_monitors = sets:add_element(QPid, QMons)}; - true -> State - end. - handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) -> case rabbit_misc:is_abnormal_termination(Reason) of true -> {MXs, UC1} = dtree:take_all(QPid, UC), @@ -1324,7 +1317,9 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ QNames}, State) -> {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery), - State1 = lists:foldl(fun monitor_queue/2, State, DeliveredQPids), + State1 = State#ch{queue_monitors = + pmon:monitor_all(DeliveredQPids, + State#ch.queue_monitors)}, State2 = process_routing_result(RoutingRes, DeliveredQPids, XName, MsgSeqNo, Message, State1), maybe_incr_stats([{XName, 1} | diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 83e28c44a8..910a89b42c 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -242,6 +242,11 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). +%% Optimisation +route(#exchange{name = #resource{name = <<"">>, virtual_host = VHost}}, + #delivery{message = #basic_message{routing_keys = RKs}}) -> + [rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)]; + route(X = #exchange{name = XName}, Delivery) -> route1(Delivery, {queue:from_list([X]), XName, []}). diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index d0b5bab779..2d155d14e3 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -328,7 +328,7 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) -> ensure_gm_heartbeat(), {ok, #state { q = Q, gm = GM1, - monitors = dict:new(), + monitors = pmon:new(), death_fun = DeathFun, length_fun = LengthFun }, hibernate, @@ -353,17 +353,8 @@ handle_cast(request_length, State = #state { length_fun = LengthFun }) -> ok = LengthFun(), noreply(State); -handle_cast({ensure_monitoring, Pids}, - State = #state { monitors = Monitors }) -> - Monitors1 = - lists:foldl(fun (Pid, MonitorsN) -> - case dict:is_key(Pid, MonitorsN) of - true -> MonitorsN; - false -> MRef = erlang:monitor(process, Pid), - dict:store(Pid, MRef, MonitorsN) - end - end, Monitors, Pids), - noreply(State #state { monitors = Monitors1 }). +handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) -> + noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }). handle_info(send_gm_heartbeat, State = #state{gm = GM}) -> gm:broadcast(GM, heartbeat), @@ -371,12 +362,12 @@ handle_info(send_gm_heartbeat, State = #state{gm = GM}) -> noreply(State); handle_info({'DOWN', _MonitorRef, process, Pid, _Reason}, - State = #state { monitors = Monitors, + State = #state { monitors = Mons, death_fun = DeathFun }) -> - noreply(case dict:is_key(Pid, Monitors) of + noreply(case pmon:is_monitored(Pid, Mons) of false -> State; true -> ok = DeathFun(Pid), - State #state { monitors = dict:erase(Pid, Monitors) } + State #state { monitors = pmon:erase(Pid, Mons) } end); handle_info(Msg, State) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 98a80a2619..4b095209da 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -140,7 +140,7 @@ init(#amqqueue { name = QueueName } = Q) -> ack_num = 0, msg_id_status = dict:new(), - known_senders = dict:new(), + known_senders = pmon:new(), synchronised = false }, @@ -286,7 +286,7 @@ terminate(Reason, #state { q = Q, rate_timer_ref = RateTRef }) -> ok = gm:leave(GM), QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( - Q, BQ, BQS, RateTRef, [], [], dict:new()), + Q, BQ, BQS, RateTRef, [], [], pmon:new(), dict:new()), rabbit_amqqueue_process:terminate(Reason, QueueState); terminate([_SPid], _Reason) -> %% gm case @@ -459,12 +459,8 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, %% Everything that we're monitoring, we need to ensure our new %% coordinator is monitoring. - - MonitoringPids = [begin put({ch_publisher, Pid}, MRef), - Pid - end || {Pid, MRef} <- dict:to_list(KS)], - ok = rabbit_mirror_queue_coordinator:ensure_monitoring( - CPid, MonitoringPids), + MPids = pmon:monitored(KS), + ok = rabbit_mirror_queue_coordinator:ensure_monitoring(CPid, MPids), %% We find all the messages that we've received from channels but %% not from gm, and if they're due to be enqueued on promotion @@ -537,7 +533,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, Status =:= published orelse Status =:= confirmed]), MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( - CPid, BQ, BQS, GM, SS, MonitoringPids), + CPid, BQ, BQS, GM, SS, MPids), MTC = lists:foldl(fun ({MsgId, {published, ChPid, MsgSeqNo}}, MTC0) -> gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0); @@ -550,7 +546,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, {Delivery, true} <- queue:to_list(PubQ)], QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( Q1, rabbit_mirror_queue_master, MasterState, RateTRef, - AckTags, Deliveries, MTC), + AckTags, Deliveries, KS, MTC), {become, rabbit_amqqueue_process, QueueState, hibernate}. noreply(State) -> @@ -605,14 +601,10 @@ stop_rate_timer(State = #state { rate_timer_ref = TRef }) -> State #state { rate_timer_ref = undefined }. ensure_monitoring(ChPid, State = #state { known_senders = KS }) -> - case dict:is_key(ChPid, KS) of - true -> State; - false -> MRef = erlang:monitor(process, ChPid), - State #state { known_senders = dict:store(ChPid, MRef, KS) } - end. + State #state { known_senders = pmon:monitor(ChPid, KS) }. local_sender_death(ChPid, State = #state { known_senders = KS }) -> - ok = case dict:is_key(ChPid, KS) of + ok = case pmon:is_monitored(ChPid, KS) of false -> ok; true -> credit_flow:peer_down(ChPid), confirm_sender_death(ChPid) @@ -628,7 +620,7 @@ confirm_sender_death(Pid) -> fun (?MODULE, State = #state { known_senders = KS, gm = GM }) -> %% We're running still as a slave - ok = case dict:is_key(Pid, KS) of + ok = case pmon:is_monitored(Pid, KS) of false -> ok; true -> gm:broadcast(GM, {ensure_monitoring, [Pid]}), confirm_sender_death(Pid) @@ -846,7 +838,7 @@ process_instruction({ack, MsgIds}, process_instruction({fold, MsgFun, AckTags}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> - BQS1 = BQ:fold(AckTags, MsgFun, BQS), + BQS1 = BQ:fold(MsgFun, BQS, AckTags), {ok, State #state { backing_queue_state = BQS1 }}; process_instruction({requeue, MsgIds}, State = #state { backing_queue = BQ, @@ -871,21 +863,18 @@ process_instruction({sender_death, ChPid}, State = #state { sender_queues = SQ, msg_id_status = MS, known_senders = KS }) -> - {ok, case dict:find(ChPid, KS) of - error -> - State; - {ok, MRef} -> - true = erlang:demonitor(MRef), - MS1 = case dict:find(ChPid, SQ) of - error -> - MS; - {ok, {_MQ, PendingCh}} -> - lists:foldl(fun dict:erase/2, MS, - sets:to_list(PendingCh)) - end, - State #state { sender_queues = dict:erase(ChPid, SQ), - msg_id_status = MS1, - known_senders = dict:erase(ChPid, KS) } + {ok, case pmon:is_monitored(ChPid, KS) of + false -> State; + true -> MS1 = case dict:find(ChPid, SQ) of + error -> + MS; + {ok, {_MQ, PendingCh}} -> + lists:foldl(fun dict:erase/2, MS, + sets:to_list(PendingCh)) + end, + State #state { sender_queues = dict:erase(ChPid, SQ), + msg_id_status = MS1, + known_senders = pmon:demonitor(ChPid, KS) } end}; process_instruction({length, Length}, State = #state { backing_queue = BQ, diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl index df957d883c..6dad01cc79 100644 --- a/src/rabbit_queue_collector.erl +++ b/src/rabbit_queue_collector.erl @@ -23,7 +23,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {queues, delete_from}). +-record(state, {monitors, delete_from}). -include("rabbit.hrl"). @@ -32,7 +32,7 @@ -ifdef(use_specs). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(register/2 :: (pid(), rabbit_types:amqqueue()) -> 'ok'). +-spec(register/2 :: (pid(), pid()) -> 'ok'). -spec(delete_all/1 :: (pid()) -> 'ok'). -endif. @@ -51,39 +51,37 @@ delete_all(CollectorPid) -> %%---------------------------------------------------------------------------- init([]) -> - {ok, #state{queues = dict:new(), delete_from = undefined}}. + {ok, #state{monitors = pmon:new(), delete_from = undefined}}. %%-------------------------------------------------------------------------- -handle_call({register, Q}, _From, - State = #state{queues = Queues, delete_from = Deleting}) -> - MonitorRef = erlang:monitor(process, Q#amqqueue.pid), +handle_call({register, QPid}, _From, + State = #state{monitors = QMons, delete_from = Deleting}) -> case Deleting of undefined -> ok; - _ -> rabbit_amqqueue:delete_immediately(Q) + _ -> ok = rabbit_amqqueue:delete_immediately([QPid]) end, - {reply, ok, State#state{queues = dict:store(MonitorRef, Q, Queues)}}; + {reply, ok, State#state{monitors = pmon:monitor(QPid, QMons)}}; -handle_call(delete_all, From, State = #state{queues = Queues, +handle_call(delete_all, From, State = #state{monitors = QMons, delete_from = undefined}) -> - case dict:size(Queues) of - 0 -> {reply, ok, State#state{delete_from = From}}; - _ -> [rabbit_amqqueue:delete_immediately(Q) - || {_MRef, Q} <- dict:to_list(Queues)], - {noreply, State#state{delete_from = From}} + case pmon:monitored(QMons) of + [] -> {reply, ok, State#state{delete_from = From}}; + QPids -> ok = rabbit_amqqueue:delete_immediately(QPids), + {noreply, State#state{delete_from = From}} end. handle_cast(Msg, State) -> {stop, {unhandled_cast, Msg}, State}. -handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason}, - State = #state{queues = Queues, delete_from = Deleting}) -> - Queues1 = dict:erase(MonitorRef, Queues), - case Deleting =/= undefined andalso dict:size(Queues1) =:= 0 of +handle_info({'DOWN', _MRef, process, DownPid, _Reason}, + State = #state{monitors = QMons, delete_from = Deleting}) -> + QMons1 = pmon:erase(DownPid, QMons), + case Deleting =/= undefined andalso pmon:is_empty(QMons1) of true -> gen_server:reply(Deleting, ok); false -> ok end, - {noreply, State#state{queues = Queues1}}. + {noreply, State#state{monitors = QMons1}}. terminate(_Reason, _State) -> ok. |
