diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2011-05-20 18:27:35 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-05-20 18:27:35 +0100 |
| commit | 644a66996de65456ea5716436f9d615287ebf41f (patch) | |
| tree | 97237baa0c833da332bb287a85afafe39ee244be | |
| parent | c5edb3dbca4f3e046101203e2e77bd545790a674 (diff) | |
| download | rabbitmq-server-git-644a66996de65456ea5716436f9d615287ebf41f.tar.gz | |
That's an awful lot of work to solve a potential memory leak...
| -rw-r--r-- | src/rabbit_amqqueue.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 46 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 47 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 53 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 179 |
5 files changed, 257 insertions, 87 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 8c374ef3cd..0550f13b5c 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -33,6 +33,7 @@ %% internal -export([internal_declare/2, internal_delete/1, run_backing_queue/3, run_backing_queue_async/3, + run_backing_queue/4, run_backing_queue_async/4, sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2, set_maximum_since_use/2, maybe_expire/1, drop_expired/1, emit_stats/1]). @@ -149,6 +150,14 @@ -spec(run_backing_queue_async/3 :: (pid(), atom(), (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). +-spec(run_backing_queue/4 :: + (pid(), atom(), + (fun ((atom(), A) -> {[rabbit_types:msg_id()], A})), + integer() | 'default') -> 'ok'). +-spec(run_backing_queue_async/4 :: + (pid(), atom(), + (fun ((atom(), A) -> {[rabbit_types:msg_id()], A})), + integer() | 'default') -> 'ok'). -spec(sync_timeout/1 :: (pid()) -> 'ok'). -spec(update_ram_duration/1 :: (pid()) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). @@ -448,10 +457,16 @@ internal_delete(QueueName) -> end). run_backing_queue(QPid, Mod, Fun) -> - gen_server2:call(QPid, {run_backing_queue, Mod, Fun}, infinity). + run_backing_queue(QPid, Mod, Fun, default). run_backing_queue_async(QPid, Mod, Fun) -> - gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}). + run_backing_queue_async(QPid, Mod, Fun, default). + +run_backing_queue(QPid, Mod, Fun, Priority) -> + gen_server2:call(QPid, {run_backing_queue, Mod, Fun, Priority}, infinity). + +run_backing_queue_async(QPid, Mod, Fun, Priority) -> + gen_server2:cast(QPid, {run_backing_queue, Mod, Fun, Priority}). sync_timeout(QPid) -> gen_server2:cast(QPid, sync_timeout). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d654f37233..7daf869bfc 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -127,7 +127,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, State = requeue_and_run( AckTags, process_args( - #q{q = Q#amqqueue{pid = self()}, + #q{q = Q, exclusive_consumer = none, has_had_consumers = false, backing_queue = BQ, @@ -843,29 +843,31 @@ emit_consumer_deleted(ChPid, ConsumerTag) -> prioritise_call(Msg, _From, _State) -> case Msg of - info -> 9; - {info, _Items} -> 9; - consumers -> 9; - {run_backing_queue, _Mod, _Fun} -> 6; - _ -> 0 + info -> 9; + {info, _Items} -> 9; + consumers -> 9; + {run_backing_queue, _Mod, _Fun, default} -> 6; + {run_backing_queue, _Mod, _Fun, Priority} -> Priority; + _ -> 0 end. prioritise_cast(Msg, _State) -> case Msg of - update_ram_duration -> 8; - delete_immediately -> 8; - {set_ram_duration_target, _Duration} -> 8; - {set_maximum_since_use, _Age} -> 8; - maybe_expire -> 8; - drop_expired -> 8; - emit_stats -> 7; - {ack, _Txn, _AckTags, _ChPid} -> 7; - {reject, _AckTags, _Requeue, _ChPid} -> 7; - {notify_sent, _ChPid} -> 7; - {unblock, _ChPid} -> 7; - {run_backing_queue, _Mod, _Fun} -> 6; - sync_timeout -> 6; - _ -> 0 + update_ram_duration -> 8; + delete_immediately -> 8; + {set_ram_duration_target, _Duration} -> 8; + {set_maximum_since_use, _Age} -> 8; + maybe_expire -> 8; + drop_expired -> 8; + emit_stats -> 7; + {ack, _Txn, _AckTags, _ChPid} -> 7; + {reject, _AckTags, _Requeue, _ChPid} -> 7; + {notify_sent, _ChPid} -> 7; + {unblock, _ChPid} -> 7; + {run_backing_queue, _Mod, _Fun, default} -> 6; + {run_backing_queue, _Mod, _Fun, Priority} -> Priority; + sync_timeout -> 6; + _ -> 0 end. prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, @@ -1079,11 +1081,11 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> noreply(requeue_and_run(AckTags, State)) end; -handle_call({run_backing_queue, Mod, Fun}, _From, State) -> +handle_call({run_backing_queue, Mod, Fun, _Priority}, _From, State) -> reply(ok, run_backing_queue(Mod, Fun, State)). -handle_cast({run_backing_queue, Mod, Fun}, State) -> +handle_cast({run_backing_queue, Mod, Fun, _Priority}, State) -> noreply(run_backing_queue(Mod, Fun, State)); handle_cast(sync_timeout, State) -> diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 8ddda1cd43..5660112a45 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -16,7 +16,7 @@ -module(rabbit_mirror_queue_coordinator). --export([start_link/2, get_gm/1]). +-export([start_link/3, get_gm/1, ensure_monitoring/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -30,7 +30,9 @@ -include("gm_specs.hrl"). -record(state, { q, - gm + gm, + monitors, + death_fun }). -define(ONE_SECOND, 1000). @@ -223,17 +225,20 @@ %% %%---------------------------------------------------------------------------- -start_link(Queue, GM) -> - gen_server2:start_link(?MODULE, [Queue, GM], []). +start_link(Queue, GM, DeathFun) -> + gen_server2:start_link(?MODULE, [Queue, GM, DeathFun], []). get_gm(CPid) -> gen_server2:call(CPid, get_gm, infinity). +ensure_monitoring(CPid, Pids) -> + gen_server2:cast(CPid, {ensure_monitoring, Pids}). + %% --------------------------------------------------------------------------- %% gen_server %% --------------------------------------------------------------------------- -init([#amqqueue { name = QueueName } = Q, GM]) -> +init([#amqqueue { name = QueueName } = Q, GM, DeathFun]) -> GM1 = case GM of undefined -> ok = gm:create_tables(), @@ -248,7 +253,11 @@ init([#amqqueue { name = QueueName } = Q, GM]) -> end, {ok, _TRef} = timer:apply_interval(?ONE_SECOND, gm, broadcast, [GM1, heartbeat]), - {ok, #state { q = Q, gm = GM1 }, hibernate, + {ok, #state { q = Q, + gm = GM1, + monitors = dict:new(), + death_fun = DeathFun }, + hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. handle_call(get_gm, _From, State = #state { gm = GM }) -> @@ -265,7 +274,29 @@ handle_cast({gm_deaths, Deaths}, noreply(State); {error, not_found} -> {stop, normal, State} - end. + end; + +handle_cast({ensure_monitoring, Pids}, + State = #state { monitors = Monitors }) -> + Monitors1 = + lists:foldl(fun (Pid, MonitorsN) -> + case dict:is_key(Pid, MonitorsN) of + true -> MonitorsN; + false -> MRef = erlang:monitor(process, Pid), + dict:store(Pid, MRef, MonitorsN) + end + end, Monitors, Pids), + noreply(State #state { monitors = Monitors1 }). + +handle_info({'DOWN', _MonitorRef, process, Pid, _Reason}, + State = #state { monitors = Monitors, + death_fun = Fun }) -> + noreply( + case dict:is_key(Pid, Monitors) of + false -> State; + true -> ok = Fun(Pid), + State #state { monitors = dict:erase(Pid, Monitors) } + end); handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. @@ -295,6 +326,8 @@ members_changed([CPid], _Births, Deaths) -> handle_msg([_CPid], _From, heartbeat) -> ok; +handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) -> + ok = gen_server2:cast(CPid, Msg); handle_msg([_CPid], _From, _Msg) -> ok. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index e973ea7837..0e7f32f05b 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -26,7 +26,7 @@ -export([start/1, stop/0]). --export([promote_backing_queue_state/5]). +-export([promote_backing_queue_state/6, sender_death_fun/0]). -behaviour(rabbit_backing_queue). @@ -39,7 +39,8 @@ set_delivered, seen_status, confirmed, - ack_msg_id + ack_msg_id, + known_senders }). %% For general documentation of HA design, see @@ -58,9 +59,31 @@ stop() -> %% Same as start/1. exit({not_valid_for_generic_backing_queue, ?MODULE}). +sender_death_fun() -> + Self = self(), + fun (DeadPid) -> + %% Purposefully set the priority to 0 here so that we + %% don't overtake any messages from DeadPid that are + %% already in the queue. + rabbit_amqqueue:run_backing_queue_async( + Self, ?MODULE, + fun (?MODULE, State = #state { gm = GM, known_senders = KS }) -> + rabbit_log:info("Master saw death of sender ~p~n", [DeadPid]), + case sets:is_element(DeadPid, KS) of + false -> + State; + true -> + ok = gm:broadcast(GM, {sender_death, DeadPid}), + KS1 = sets:del_element(DeadPid, KS), + State #state { known_senders = KS1 } + end + end, 0) + end. + init(#amqqueue { arguments = Args, name = QName } = Q, Recover, AsyncCallback, SyncCallback) -> - {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, undefined), + {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( + Q, undefined, sender_death_fun()), GM = rabbit_mirror_queue_coordinator:get_gm(CPid), {_Type, Nodes} = rabbit_misc:table_lookup(Args, <<"x-mirror">>), Nodes1 = case Nodes of @@ -78,9 +101,10 @@ init(#amqqueue { arguments = Args, name = QName } = Q, Recover, set_delivered = 0, seen_status = dict:new(), confirmed = [], - ack_msg_id = dict:new() }. + ack_msg_id = dict:new(), + known_senders = sets:new() }. -promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus) -> +promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) -> #state { gm = GM, coordinator = CPid, backing_queue = BQ, @@ -88,7 +112,8 @@ promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus) -> set_delivered = BQ:len(BQS), seen_status = SeenStatus, confirmed = [], - ack_msg_id = dict:new() }. + ack_msg_id = dict:new(), + known_senders = sets:from_list(KS) }. terminate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> %% Backing queue termination. The queue is going down but @@ -119,7 +144,7 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid, false = dict:is_key(MsgId, SS), %% ASSERTION ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}), BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), - State #state { backing_queue_state = BQS1 }. + ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }). publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps, ChPid, State = #state { gm = GM, @@ -136,8 +161,9 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps, {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS), AM1 = maybe_store_acktag(AckTag, MsgId, AM), - {AckTag, State #state { backing_queue_state = BQS1, - ack_msg_id = AM1 }}. + {AckTag, + ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1, + ack_msg_id = AM1 })}. dropwhile(Fun, State = #state { gm = GM, backing_queue = BQ, @@ -341,3 +367,12 @@ maybe_store_acktag(undefined, _MsgId, AM) -> AM; maybe_store_acktag(AckTag, MsgId, AM) -> dict:store(AckTag, MsgId, AM). + +ensure_monitoring(ChPid, State = #state { coordinator = CPid, + known_senders = KS }) -> + case sets:is_element(ChPid, KS) of + true -> State; + false -> ok = rabbit_mirror_queue_coordinator:ensure_monitoring( + CPid, [ChPid]), + State #state { known_senders = sets:add_element(ChPid, KS) } + end. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 63a43197dd..7fc2c8cbca 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -49,10 +49,11 @@ -define(SYNC_INTERVAL, 25). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). +-define(DEATH_TIMEOUT, 20000). %% 20 seconds -record(state, { q, gm, - master_node, + master_pid, backing_queue, backing_queue_state, sync_timer_ref, @@ -62,7 +63,8 @@ msg_id_ack, %% :: MsgId -> AckTag ack_num, - msg_id_status + msg_id_status, + known_senders }). start_link(Q) -> @@ -102,7 +104,7 @@ init([#amqqueue { name = QueueName } = Q]) -> BQS = bq_init(BQ, Q, false), {ok, #state { q = Q, gm = GM, - master_node = node(MPid), + master_pid = MPid, backing_queue = BQ, backing_queue_state = BQS, rate_timer_ref = undefined, @@ -112,7 +114,8 @@ init([#amqqueue { name = QueueName } = Q]) -> msg_id_ack = dict:new(), ack_num = 0, - msg_id_status = dict:new() + msg_id_status = dict:new(), + known_senders = dict:new() }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -140,9 +143,9 @@ handle_call({deliver, Delivery = #delivery {}}, From, State) -> noreply(maybe_enqueue_message(Delivery, true, State)); handle_call({gm_deaths, Deaths}, From, - State = #state { q = #amqqueue { name = QueueName }, - gm = GM, - master_node = MNode }) -> + State = #state { q = #amqqueue { name = QueueName }, + gm = GM, + master_pid = MPid }) -> rabbit_log:info("Mirrored-queue (~s): Slave ~s saw deaths of mirrors ~s~n", [rabbit_misc:rs(QueueName), rabbit_misc:pid_to_string(self()), @@ -150,7 +153,7 @@ handle_call({gm_deaths, Deaths}, From, %% The GM has told us about deaths, which means we're not going to %% receive any more messages from GM case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of - {ok, Pid} when node(Pid) =:= MNode -> + {ok, Pid} when node(Pid) =:= node(MPid) -> %% master hasn't changed reply(ok, State); {ok, Pid} when node(Pid) =:= node() -> @@ -161,20 +164,20 @@ handle_call({gm_deaths, Deaths}, From, gen_server2:reply(From, ok), erlang:monitor(process, Pid), ok = gm:broadcast(GM, heartbeat), - noreply(State #state { master_node = node(Pid) }); + noreply(State #state { master_pid = Pid }); {error, not_found} -> gen_server2:reply(From, ok), {stop, normal, State} end; -handle_call({run_backing_queue, Mod, Fun}, _From, State) -> +handle_call({run_backing_queue, Mod, Fun, _Priority}, _From, State) -> reply(ok, run_backing_queue(Mod, Fun, State)); handle_call({commit, _Txn, _ChPid}, _From, State) -> %% We don't support transactions in mirror queues reply(ok, State). -handle_cast({run_backing_queue, Mod, Fun}, State) -> +handle_cast({run_backing_queue, Mod, Fun, _Priority}, State) -> noreply(run_backing_queue(Mod, Fun, State)); handle_cast({gm, Instruction}, State) -> @@ -215,11 +218,14 @@ handle_cast({rollback, _Txn, _ChPid}, State) -> handle_info(timeout, State) -> noreply(backing_queue_timeout(State)); -handle_info({'DOWN', _MonitorRef, process, Pid, _Reason}, - State = #state { gm = GM }) -> - ok = gm:broadcast(GM, {process_death, Pid}), +handle_info({'DOWN', _MonitorRef, process, MPid, _Reason}, + State = #state { gm = GM, master_pid = MPid }) -> + ok = gm:broadcast(GM, {process_death, MPid}), noreply(State); +handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) -> + noreply(local_sender_death(ChPid, State)); + handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. @@ -259,21 +265,23 @@ handle_pre_hibernate(State = #state { backing_queue = BQ, prioritise_call(Msg, _From, _State) -> case Msg of - {run_backing_queue, _Mod, _Fun} -> 6; - {gm_deaths, _Deaths} -> 5; - _ -> 0 + {run_backing_queue, _Mod, _Fun, default} -> 6; + {run_backing_queue, _Mod, _Fun, Priority} -> Priority; + {gm_deaths, _Deaths} -> 5; + _ -> 0 end. prioritise_cast(Msg, _State) -> case Msg of - update_ram_duration -> 8; - {set_ram_duration_target, _Duration} -> 8; - {set_maximum_since_use, _Age} -> 8; - {run_backing_queue, _Mod, _Fun} -> 6; - sync_timeout -> 6; - {gm, _Msg} -> 5; - {post_commit, _Txn, _AckTags} -> 4; - _ -> 0 + update_ram_duration -> 8; + {set_ram_duration_target, _Duration} -> 8; + {set_maximum_since_use, _Age} -> 8; + {run_backing_queue, _Mod, _Fun, default} -> 6; + {run_backing_queue, _Mod, _Fun, Priority} -> Priority; + sync_timeout -> 6; + {gm, _Msg} -> 5; + {post_commit, _Txn, _AckTags} -> 4; + _ -> 0 end. %% --------------------------------------------------------------------------- @@ -291,6 +299,9 @@ members_changed([SPid], _Births, Deaths) -> handle_msg([_SPid], _From, heartbeat) -> ok; +handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) -> + %% This is only of value to the master + ok; handle_msg([SPid], _From, {process_death, Pid}) -> inform_deaths(SPid, [Pid]); handle_msg([SPid], _From, Msg) -> @@ -327,9 +338,9 @@ bq_init(BQ, Q, Recover) -> end). run_backing_queue(rabbit_mirror_queue_master, Fun, State) -> - %% Yes, this might look a little crazy, but see comments around - %% process_instruction({tx_commit,...}, State). - Fun(rabbit_mirror_queue_master, State); + %% Yes, this might look a little crazy, but see comments in + %% local_sender_death/2 + Fun(?MODULE, State); run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }. @@ -392,15 +403,27 @@ promote_me(From, #state { q = Q, rate_timer_ref = RateTRef, sender_queues = SQ, msg_id_ack = MA, - msg_id_status = MS }) -> + msg_id_status = MS, + known_senders = KS }) -> rabbit_log:info("Mirrored-queue (~s): Promoting slave ~s to master~n", [rabbit_misc:rs(Q #amqqueue.name), rabbit_misc:pid_to_string(self())]), - {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, GM), + Q1 = Q #amqqueue { pid = self() }, + {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( + Q1, GM, rabbit_mirror_queue_master:sender_death_fun()), true = unlink(GM), gen_server2:reply(From, {promote, CPid}), ok = gm:confirmed_broadcast(GM, heartbeat), + %% Everything that we're monitoring, we need to ensure our new + %% coordinator is monitoring. + + MonitoringPids = [begin true = erlang:demonitor(MRef), + Pid + end || {Pid, MRef} <- dict:to_list(KS)], + ok = rabbit_mirror_queue_coordinator:ensure_monitoring( + CPid, MonitoringPids), + %% We find all the messages that we've received from channels but %% not from gm, and if they're due to be enqueued on promotion %% then we pass them to the @@ -472,7 +495,7 @@ promote_me(From, #state { q = Q, Status =:= published orelse Status =:= confirmed]), MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( - CPid, BQ, BQS, GM, SS), + CPid, BQ, BQS, GM, SS, MonitoringPids), MTC = dict:from_list( [{MsgId, {ChPid, MsgSeqNo}} || @@ -482,7 +505,7 @@ promote_me(From, #state { q = Q, Deliveries = [Delivery || {_ChPid, PubQ} <- dict:to_list(SQ), {Delivery, true} <- queue:to_list(PubQ)], QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( - Q, rabbit_mirror_queue_master, MasterState, RateTRef, + Q1, rabbit_mirror_queue_master, MasterState, RateTRef, AckTags, Deliveries, MTC), {become, rabbit_amqqueue_process, QueueState, hibernate}. @@ -540,6 +563,52 @@ stop_rate_timer(State = #state { rate_timer_ref = TRef }) -> {ok, cancel} = timer:cancel(TRef), State #state { rate_timer_ref = undefined }. +ensure_monitoring(ChPid, State = #state { known_senders = KS }) -> + case dict:is_key(ChPid, KS) of + true -> State; + false -> MRef = erlang:monitor(process, ChPid), + State #state { known_senders = dict:store(ChPid, MRef, KS) } + end. + +local_sender_death(ChPid, State = #state { known_senders = KS }) -> + case dict:is_key(ChPid, KS) of + false -> + ok; + true -> + %% We have to deal with the possibility that we'll be + %% promoted to master before this thing gets + %% run. Consequently we set the module to + %% rabbit_mirror_queue_master so that if we do become a + %% rabbit_amqqueue_process before then, sane things will + %% happen. + Fun = + fun (?MODULE, State1 = #state { known_senders = KS1, + gm = GM }) -> + %% We're running still as a slave + ok = case dict:is_key(ChPid, KS1) of + false -> + ok; + true -> + gm:broadcast( + GM, {ensure_monitoring, [ChPid]}) + end, + State1; + (rabbit_mirror_queue_master, State1) -> + %% We've become a master. State1 is now opaque + %% to us. When we became master, if ChPid was + %% still known to us then we'd have set up + %% monitoring of it then, so this is now a + %% noop. + State1 + end, + %% Note that we do not remove our knowledge of this ChPid + %% until we get the sender_death from GM. + timer:apply_after( + ?DEATH_TIMEOUT, rabbit_amqqueue, run_backing_queue_async, + [self(), rabbit_mirror_queue_master, Fun]) + end, + State. + maybe_enqueue_message( Delivery = #delivery { message = #basic_message { id = MsgId }, msg_seq_no = MsgSeqNo, @@ -548,6 +617,7 @@ maybe_enqueue_message( EnqueueOnPromotion, State = #state { sender_queues = SQ, msg_id_status = MS }) -> + State1 = ensure_monitoring(ChPid, State), %% We will never see {published, ChPid, MsgSeqNo} here. case dict:find(MsgId, MS) of error -> @@ -557,30 +627,30 @@ maybe_enqueue_message( end, SQ1 = dict:store(ChPid, queue:in({Delivery, EnqueueOnPromotion}, MQ), SQ), - State #state { sender_queues = SQ1 }; + State1 #state { sender_queues = SQ1 }; {ok, {confirmed, ChPid}} -> %% BQ has confirmed it but we didn't know what the %% msg_seq_no was at the time. We do now! ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), - State #state { msg_id_status = dict:erase(MsgId, MS) }; + State1 #state { msg_id_status = dict:erase(MsgId, MS) }; {ok, {published, ChPid}} -> %% It was published to the BQ and we didn't know the %% msg_seq_no so couldn't confirm it at the time. - case needs_confirming(Delivery, State) of + case needs_confirming(Delivery, State1) of never -> - State #state { msg_id_status = dict:erase(MsgId, MS) }; + State1 #state { msg_id_status = dict:erase(MsgId, MS) }; eventually -> - State #state { + State1 #state { msg_id_status = dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) }; immediately -> ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), - State #state { msg_id_status = dict:erase(MsgId, MS) } + State1 #state { msg_id_status = dict:erase(MsgId, MS) } end; {ok, discarded} -> %% We've already heard from GM that the msg is to be %% discarded. We won't see this again. - State #state { msg_id_status = dict:erase(MsgId, MS) } + State1 #state { msg_id_status = dict:erase(MsgId, MS) } end; maybe_enqueue_message(_Delivery, _EnqueueOnPromotion, State) -> %% We don't support txns in mirror queues. @@ -601,6 +671,7 @@ process_instruction( %% which means that we're going to have to hang on to the fact %% that we've seen the msg_id confirmed until we can associate it %% with a msg_seq_no. + State1 = ensure_monitoring(ChPid, State), MS1 = dict:store(MsgId, {published, ChPid}, MS), {SQ1, MS2} = case dict:find(ChPid, SQ) of @@ -618,7 +689,7 @@ process_instruction( %% first. Thus we need to deal with confirms %% here. {dict:store(ChPid, MQ1, SQ), - case needs_confirming(Delivery, State) of + case needs_confirming(Delivery, State1) of never -> MS; eventually -> @@ -639,19 +710,19 @@ process_instruction( end end, - State1 = State #state { sender_queues = SQ1, - msg_id_status = MS2 }, + State2 = State1 #state { sender_queues = SQ1, + msg_id_status = MS2 }, {ok, case Deliver of false -> BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), - State1 #state { backing_queue_state = BQS1 }; + State2 #state { backing_queue_state = BQS1 }; {true, AckRequired} -> {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS), maybe_store_ack(AckRequired, MsgId, AckTag, - State1 #state { backing_queue_state = BQS1 }) + State2 #state { backing_queue_state = BQS1 }) end}; process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }}, State = #state { sender_queues = SQ, @@ -660,6 +731,7 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }}, msg_id_status = MS }) -> %% Many of the comments around the publish head above apply here %% too. + State1 = ensure_monitoring(ChPid, State), MS1 = dict:store(MsgId, discarded, MS), {SQ1, MS2} = case dict:find(ChPid, SQ) of @@ -685,9 +757,9 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }}, end end, BQS1 = BQ:discard(Msg, ChPid, BQS), - {ok, State #state { sender_queues = SQ1, - msg_id_status = MS2, - backing_queue_state = BQS1 }}; + {ok, State1 #state { sender_queues = SQ1, + msg_id_status = MS2, + backing_queue_state = BQS1 }}; process_instruction({set_length, Length}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> @@ -746,6 +818,19 @@ process_instruction({requeue, MsgPropsFun, MsgIds}, State #state { msg_id_ack = dict:new(), backing_queue_state = BQS2 } end}; +process_instruction({sender_death, ChPid}, + State = #state { sender_queues = SQ, + known_senders = KS }) -> + rabbit_log:info("Slave received death of sender ~p~n", [ChPid]), + {ok, case dict:find(ChPid, KS) of + error -> + State; + {ok, MRef} -> + true = erlang:demonitor(MRef), + KS1 = dict:erase(ChPid, KS), + SQ1 = dict:erase(ChPid, SQ), + State #state { sender_queues = SQ1, known_senders = KS1} + end}; process_instruction(delete_and_terminate, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> |
