summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-05-20 18:27:35 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-05-20 18:27:35 +0100
commit644a66996de65456ea5716436f9d615287ebf41f (patch)
tree97237baa0c833da332bb287a85afafe39ee244be
parentc5edb3dbca4f3e046101203e2e77bd545790a674 (diff)
downloadrabbitmq-server-git-644a66996de65456ea5716436f9d615287ebf41f.tar.gz
That's an awful lot of work to solve a potential memory leak...
-rw-r--r--src/rabbit_amqqueue.erl19
-rw-r--r--src/rabbit_amqqueue_process.erl46
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl47
-rw-r--r--src/rabbit_mirror_queue_master.erl53
-rw-r--r--src/rabbit_mirror_queue_slave.erl179
5 files changed, 257 insertions, 87 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 8c374ef3cd..0550f13b5c 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -33,6 +33,7 @@
%% internal
-export([internal_declare/2, internal_delete/1,
run_backing_queue/3, run_backing_queue_async/3,
+ run_backing_queue/4, run_backing_queue_async/4,
sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2,
set_maximum_since_use/2, maybe_expire/1, drop_expired/1,
emit_stats/1]).
@@ -149,6 +150,14 @@
-spec(run_backing_queue_async/3 ::
(pid(), atom(),
(fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
+-spec(run_backing_queue/4 ::
+ (pid(), atom(),
+ (fun ((atom(), A) -> {[rabbit_types:msg_id()], A})),
+ integer() | 'default') -> 'ok').
+-spec(run_backing_queue_async/4 ::
+ (pid(), atom(),
+ (fun ((atom(), A) -> {[rabbit_types:msg_id()], A})),
+ integer() | 'default') -> 'ok').
-spec(sync_timeout/1 :: (pid()) -> 'ok').
-spec(update_ram_duration/1 :: (pid()) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
@@ -448,10 +457,16 @@ internal_delete(QueueName) ->
end).
run_backing_queue(QPid, Mod, Fun) ->
- gen_server2:call(QPid, {run_backing_queue, Mod, Fun}, infinity).
+ run_backing_queue(QPid, Mod, Fun, default).
run_backing_queue_async(QPid, Mod, Fun) ->
- gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}).
+ run_backing_queue_async(QPid, Mod, Fun, default).
+
+run_backing_queue(QPid, Mod, Fun, Priority) ->
+ gen_server2:call(QPid, {run_backing_queue, Mod, Fun, Priority}, infinity).
+
+run_backing_queue_async(QPid, Mod, Fun, Priority) ->
+ gen_server2:cast(QPid, {run_backing_queue, Mod, Fun, Priority}).
sync_timeout(QPid) ->
gen_server2:cast(QPid, sync_timeout).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d654f37233..7daf869bfc 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -127,7 +127,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
State = requeue_and_run(
AckTags,
process_args(
- #q{q = Q#amqqueue{pid = self()},
+ #q{q = Q,
exclusive_consumer = none,
has_had_consumers = false,
backing_queue = BQ,
@@ -843,29 +843,31 @@ emit_consumer_deleted(ChPid, ConsumerTag) ->
prioritise_call(Msg, _From, _State) ->
case Msg of
- info -> 9;
- {info, _Items} -> 9;
- consumers -> 9;
- {run_backing_queue, _Mod, _Fun} -> 6;
- _ -> 0
+ info -> 9;
+ {info, _Items} -> 9;
+ consumers -> 9;
+ {run_backing_queue, _Mod, _Fun, default} -> 6;
+ {run_backing_queue, _Mod, _Fun, Priority} -> Priority;
+ _ -> 0
end.
prioritise_cast(Msg, _State) ->
case Msg of
- update_ram_duration -> 8;
- delete_immediately -> 8;
- {set_ram_duration_target, _Duration} -> 8;
- {set_maximum_since_use, _Age} -> 8;
- maybe_expire -> 8;
- drop_expired -> 8;
- emit_stats -> 7;
- {ack, _Txn, _AckTags, _ChPid} -> 7;
- {reject, _AckTags, _Requeue, _ChPid} -> 7;
- {notify_sent, _ChPid} -> 7;
- {unblock, _ChPid} -> 7;
- {run_backing_queue, _Mod, _Fun} -> 6;
- sync_timeout -> 6;
- _ -> 0
+ update_ram_duration -> 8;
+ delete_immediately -> 8;
+ {set_ram_duration_target, _Duration} -> 8;
+ {set_maximum_since_use, _Age} -> 8;
+ maybe_expire -> 8;
+ drop_expired -> 8;
+ emit_stats -> 7;
+ {ack, _Txn, _AckTags, _ChPid} -> 7;
+ {reject, _AckTags, _Requeue, _ChPid} -> 7;
+ {notify_sent, _ChPid} -> 7;
+ {unblock, _ChPid} -> 7;
+ {run_backing_queue, _Mod, _Fun, default} -> 6;
+ {run_backing_queue, _Mod, _Fun, Priority} -> Priority;
+ sync_timeout -> 6;
+ _ -> 0
end.
prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
@@ -1079,11 +1081,11 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
noreply(requeue_and_run(AckTags, State))
end;
-handle_call({run_backing_queue, Mod, Fun}, _From, State) ->
+handle_call({run_backing_queue, Mod, Fun, _Priority}, _From, State) ->
reply(ok, run_backing_queue(Mod, Fun, State)).
-handle_cast({run_backing_queue, Mod, Fun}, State) ->
+handle_cast({run_backing_queue, Mod, Fun, _Priority}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
handle_cast(sync_timeout, State) ->
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 8ddda1cd43..5660112a45 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -16,7 +16,7 @@
-module(rabbit_mirror_queue_coordinator).
--export([start_link/2, get_gm/1]).
+-export([start_link/3, get_gm/1, ensure_monitoring/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
@@ -30,7 +30,9 @@
-include("gm_specs.hrl").
-record(state, { q,
- gm
+ gm,
+ monitors,
+ death_fun
}).
-define(ONE_SECOND, 1000).
@@ -223,17 +225,20 @@
%%
%%----------------------------------------------------------------------------
-start_link(Queue, GM) ->
- gen_server2:start_link(?MODULE, [Queue, GM], []).
+start_link(Queue, GM, DeathFun) ->
+ gen_server2:start_link(?MODULE, [Queue, GM, DeathFun], []).
get_gm(CPid) ->
gen_server2:call(CPid, get_gm, infinity).
+ensure_monitoring(CPid, Pids) ->
+ gen_server2:cast(CPid, {ensure_monitoring, Pids}).
+
%% ---------------------------------------------------------------------------
%% gen_server
%% ---------------------------------------------------------------------------
-init([#amqqueue { name = QueueName } = Q, GM]) ->
+init([#amqqueue { name = QueueName } = Q, GM, DeathFun]) ->
GM1 = case GM of
undefined ->
ok = gm:create_tables(),
@@ -248,7 +253,11 @@ init([#amqqueue { name = QueueName } = Q, GM]) ->
end,
{ok, _TRef} =
timer:apply_interval(?ONE_SECOND, gm, broadcast, [GM1, heartbeat]),
- {ok, #state { q = Q, gm = GM1 }, hibernate,
+ {ok, #state { q = Q,
+ gm = GM1,
+ monitors = dict:new(),
+ death_fun = DeathFun },
+ hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
handle_call(get_gm, _From, State = #state { gm = GM }) ->
@@ -265,7 +274,29 @@ handle_cast({gm_deaths, Deaths},
noreply(State);
{error, not_found} ->
{stop, normal, State}
- end.
+ end;
+
+handle_cast({ensure_monitoring, Pids},
+ State = #state { monitors = Monitors }) ->
+ Monitors1 =
+ lists:foldl(fun (Pid, MonitorsN) ->
+ case dict:is_key(Pid, MonitorsN) of
+ true -> MonitorsN;
+ false -> MRef = erlang:monitor(process, Pid),
+ dict:store(Pid, MRef, MonitorsN)
+ end
+ end, Monitors, Pids),
+ noreply(State #state { monitors = Monitors1 }).
+
+handle_info({'DOWN', _MonitorRef, process, Pid, _Reason},
+ State = #state { monitors = Monitors,
+ death_fun = Fun }) ->
+ noreply(
+ case dict:is_key(Pid, Monitors) of
+ false -> State;
+ true -> ok = Fun(Pid),
+ State #state { monitors = dict:erase(Pid, Monitors) }
+ end);
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
@@ -295,6 +326,8 @@ members_changed([CPid], _Births, Deaths) ->
handle_msg([_CPid], _From, heartbeat) ->
ok;
+handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) ->
+ ok = gen_server2:cast(CPid, Msg);
handle_msg([_CPid], _From, _Msg) ->
ok.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index e973ea7837..0e7f32f05b 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -26,7 +26,7 @@
-export([start/1, stop/0]).
--export([promote_backing_queue_state/5]).
+-export([promote_backing_queue_state/6, sender_death_fun/0]).
-behaviour(rabbit_backing_queue).
@@ -39,7 +39,8 @@
set_delivered,
seen_status,
confirmed,
- ack_msg_id
+ ack_msg_id,
+ known_senders
}).
%% For general documentation of HA design, see
@@ -58,9 +59,31 @@ stop() ->
%% Same as start/1.
exit({not_valid_for_generic_backing_queue, ?MODULE}).
+sender_death_fun() ->
+ Self = self(),
+ fun (DeadPid) ->
+ %% Purposefully set the priority to 0 here so that we
+ %% don't overtake any messages from DeadPid that are
+ %% already in the queue.
+ rabbit_amqqueue:run_backing_queue_async(
+ Self, ?MODULE,
+ fun (?MODULE, State = #state { gm = GM, known_senders = KS }) ->
+ rabbit_log:info("Master saw death of sender ~p~n", [DeadPid]),
+ case sets:is_element(DeadPid, KS) of
+ false ->
+ State;
+ true ->
+ ok = gm:broadcast(GM, {sender_death, DeadPid}),
+ KS1 = sets:del_element(DeadPid, KS),
+ State #state { known_senders = KS1 }
+ end
+ end, 0)
+ end.
+
init(#amqqueue { arguments = Args, name = QName } = Q, Recover,
AsyncCallback, SyncCallback) ->
- {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, undefined),
+ {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
+ Q, undefined, sender_death_fun()),
GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
{_Type, Nodes} = rabbit_misc:table_lookup(Args, <<"x-mirror">>),
Nodes1 = case Nodes of
@@ -78,9 +101,10 @@ init(#amqqueue { arguments = Args, name = QName } = Q, Recover,
set_delivered = 0,
seen_status = dict:new(),
confirmed = [],
- ack_msg_id = dict:new() }.
+ ack_msg_id = dict:new(),
+ known_senders = sets:new() }.
-promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus) ->
+promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) ->
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -88,7 +112,8 @@ promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus) ->
set_delivered = BQ:len(BQS),
seen_status = SeenStatus,
confirmed = [],
- ack_msg_id = dict:new() }.
+ ack_msg_id = dict:new(),
+ known_senders = sets:from_list(KS) }.
terminate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
%% Backing queue termination. The queue is going down but
@@ -119,7 +144,7 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid,
false = dict:is_key(MsgId, SS), %% ASSERTION
ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}),
BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
- State #state { backing_queue_state = BQS1 }.
+ ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps,
ChPid, State = #state { gm = GM,
@@ -136,8 +161,9 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps,
{AckTag, BQS1} =
BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS),
AM1 = maybe_store_acktag(AckTag, MsgId, AM),
- {AckTag, State #state { backing_queue_state = BQS1,
- ack_msg_id = AM1 }}.
+ {AckTag,
+ ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1,
+ ack_msg_id = AM1 })}.
dropwhile(Fun, State = #state { gm = GM,
backing_queue = BQ,
@@ -341,3 +367,12 @@ maybe_store_acktag(undefined, _MsgId, AM) ->
AM;
maybe_store_acktag(AckTag, MsgId, AM) ->
dict:store(AckTag, MsgId, AM).
+
+ensure_monitoring(ChPid, State = #state { coordinator = CPid,
+ known_senders = KS }) ->
+ case sets:is_element(ChPid, KS) of
+ true -> State;
+ false -> ok = rabbit_mirror_queue_coordinator:ensure_monitoring(
+ CPid, [ChPid]),
+ State #state { known_senders = sets:add_element(ChPid, KS) }
+ end.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 63a43197dd..7fc2c8cbca 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -49,10 +49,11 @@
-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
+-define(DEATH_TIMEOUT, 20000). %% 20 seconds
-record(state, { q,
gm,
- master_node,
+ master_pid,
backing_queue,
backing_queue_state,
sync_timer_ref,
@@ -62,7 +63,8 @@
msg_id_ack, %% :: MsgId -> AckTag
ack_num,
- msg_id_status
+ msg_id_status,
+ known_senders
}).
start_link(Q) ->
@@ -102,7 +104,7 @@ init([#amqqueue { name = QueueName } = Q]) ->
BQS = bq_init(BQ, Q, false),
{ok, #state { q = Q,
gm = GM,
- master_node = node(MPid),
+ master_pid = MPid,
backing_queue = BQ,
backing_queue_state = BQS,
rate_timer_ref = undefined,
@@ -112,7 +114,8 @@ init([#amqqueue { name = QueueName } = Q]) ->
msg_id_ack = dict:new(),
ack_num = 0,
- msg_id_status = dict:new()
+ msg_id_status = dict:new(),
+ known_senders = dict:new()
}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -140,9 +143,9 @@ handle_call({deliver, Delivery = #delivery {}}, From, State) ->
noreply(maybe_enqueue_message(Delivery, true, State));
handle_call({gm_deaths, Deaths}, From,
- State = #state { q = #amqqueue { name = QueueName },
- gm = GM,
- master_node = MNode }) ->
+ State = #state { q = #amqqueue { name = QueueName },
+ gm = GM,
+ master_pid = MPid }) ->
rabbit_log:info("Mirrored-queue (~s): Slave ~s saw deaths of mirrors ~s~n",
[rabbit_misc:rs(QueueName),
rabbit_misc:pid_to_string(self()),
@@ -150,7 +153,7 @@ handle_call({gm_deaths, Deaths}, From,
%% The GM has told us about deaths, which means we're not going to
%% receive any more messages from GM
case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
- {ok, Pid} when node(Pid) =:= MNode ->
+ {ok, Pid} when node(Pid) =:= node(MPid) ->
%% master hasn't changed
reply(ok, State);
{ok, Pid} when node(Pid) =:= node() ->
@@ -161,20 +164,20 @@ handle_call({gm_deaths, Deaths}, From,
gen_server2:reply(From, ok),
erlang:monitor(process, Pid),
ok = gm:broadcast(GM, heartbeat),
- noreply(State #state { master_node = node(Pid) });
+ noreply(State #state { master_pid = Pid });
{error, not_found} ->
gen_server2:reply(From, ok),
{stop, normal, State}
end;
-handle_call({run_backing_queue, Mod, Fun}, _From, State) ->
+handle_call({run_backing_queue, Mod, Fun, _Priority}, _From, State) ->
reply(ok, run_backing_queue(Mod, Fun, State));
handle_call({commit, _Txn, _ChPid}, _From, State) ->
%% We don't support transactions in mirror queues
reply(ok, State).
-handle_cast({run_backing_queue, Mod, Fun}, State) ->
+handle_cast({run_backing_queue, Mod, Fun, _Priority}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
handle_cast({gm, Instruction}, State) ->
@@ -215,11 +218,14 @@ handle_cast({rollback, _Txn, _ChPid}, State) ->
handle_info(timeout, State) ->
noreply(backing_queue_timeout(State));
-handle_info({'DOWN', _MonitorRef, process, Pid, _Reason},
- State = #state { gm = GM }) ->
- ok = gm:broadcast(GM, {process_death, Pid}),
+handle_info({'DOWN', _MonitorRef, process, MPid, _Reason},
+ State = #state { gm = GM, master_pid = MPid }) ->
+ ok = gm:broadcast(GM, {process_death, MPid}),
noreply(State);
+handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) ->
+ noreply(local_sender_death(ChPid, State));
+
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
@@ -259,21 +265,23 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
prioritise_call(Msg, _From, _State) ->
case Msg of
- {run_backing_queue, _Mod, _Fun} -> 6;
- {gm_deaths, _Deaths} -> 5;
- _ -> 0
+ {run_backing_queue, _Mod, _Fun, default} -> 6;
+ {run_backing_queue, _Mod, _Fun, Priority} -> Priority;
+ {gm_deaths, _Deaths} -> 5;
+ _ -> 0
end.
prioritise_cast(Msg, _State) ->
case Msg of
- update_ram_duration -> 8;
- {set_ram_duration_target, _Duration} -> 8;
- {set_maximum_since_use, _Age} -> 8;
- {run_backing_queue, _Mod, _Fun} -> 6;
- sync_timeout -> 6;
- {gm, _Msg} -> 5;
- {post_commit, _Txn, _AckTags} -> 4;
- _ -> 0
+ update_ram_duration -> 8;
+ {set_ram_duration_target, _Duration} -> 8;
+ {set_maximum_since_use, _Age} -> 8;
+ {run_backing_queue, _Mod, _Fun, default} -> 6;
+ {run_backing_queue, _Mod, _Fun, Priority} -> Priority;
+ sync_timeout -> 6;
+ {gm, _Msg} -> 5;
+ {post_commit, _Txn, _AckTags} -> 4;
+ _ -> 0
end.
%% ---------------------------------------------------------------------------
@@ -291,6 +299,9 @@ members_changed([SPid], _Births, Deaths) ->
handle_msg([_SPid], _From, heartbeat) ->
ok;
+handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) ->
+ %% This is only of value to the master
+ ok;
handle_msg([SPid], _From, {process_death, Pid}) ->
inform_deaths(SPid, [Pid]);
handle_msg([SPid], _From, Msg) ->
@@ -327,9 +338,9 @@ bq_init(BQ, Q, Recover) ->
end).
run_backing_queue(rabbit_mirror_queue_master, Fun, State) ->
- %% Yes, this might look a little crazy, but see comments around
- %% process_instruction({tx_commit,...}, State).
- Fun(rabbit_mirror_queue_master, State);
+ %% Yes, this might look a little crazy, but see comments in
+ %% local_sender_death/2
+ Fun(?MODULE, State);
run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }.
@@ -392,15 +403,27 @@ promote_me(From, #state { q = Q,
rate_timer_ref = RateTRef,
sender_queues = SQ,
msg_id_ack = MA,
- msg_id_status = MS }) ->
+ msg_id_status = MS,
+ known_senders = KS }) ->
rabbit_log:info("Mirrored-queue (~s): Promoting slave ~s to master~n",
[rabbit_misc:rs(Q #amqqueue.name),
rabbit_misc:pid_to_string(self())]),
- {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, GM),
+ Q1 = Q #amqqueue { pid = self() },
+ {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
+ Q1, GM, rabbit_mirror_queue_master:sender_death_fun()),
true = unlink(GM),
gen_server2:reply(From, {promote, CPid}),
ok = gm:confirmed_broadcast(GM, heartbeat),
+ %% Everything that we're monitoring, we need to ensure our new
+ %% coordinator is monitoring.
+
+ MonitoringPids = [begin true = erlang:demonitor(MRef),
+ Pid
+ end || {Pid, MRef} <- dict:to_list(KS)],
+ ok = rabbit_mirror_queue_coordinator:ensure_monitoring(
+ CPid, MonitoringPids),
+
%% We find all the messages that we've received from channels but
%% not from gm, and if they're due to be enqueued on promotion
%% then we pass them to the
@@ -472,7 +495,7 @@ promote_me(From, #state { q = Q,
Status =:= published orelse Status =:= confirmed]),
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
- CPid, BQ, BQS, GM, SS),
+ CPid, BQ, BQS, GM, SS, MonitoringPids),
MTC = dict:from_list(
[{MsgId, {ChPid, MsgSeqNo}} ||
@@ -482,7 +505,7 @@ promote_me(From, #state { q = Q,
Deliveries = [Delivery || {_ChPid, PubQ} <- dict:to_list(SQ),
{Delivery, true} <- queue:to_list(PubQ)],
QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
- Q, rabbit_mirror_queue_master, MasterState, RateTRef,
+ Q1, rabbit_mirror_queue_master, MasterState, RateTRef,
AckTags, Deliveries, MTC),
{become, rabbit_amqqueue_process, QueueState, hibernate}.
@@ -540,6 +563,52 @@ stop_rate_timer(State = #state { rate_timer_ref = TRef }) ->
{ok, cancel} = timer:cancel(TRef),
State #state { rate_timer_ref = undefined }.
+ensure_monitoring(ChPid, State = #state { known_senders = KS }) ->
+ case dict:is_key(ChPid, KS) of
+ true -> State;
+ false -> MRef = erlang:monitor(process, ChPid),
+ State #state { known_senders = dict:store(ChPid, MRef, KS) }
+ end.
+
+local_sender_death(ChPid, State = #state { known_senders = KS }) ->
+ case dict:is_key(ChPid, KS) of
+ false ->
+ ok;
+ true ->
+ %% We have to deal with the possibility that we'll be
+ %% promoted to master before this thing gets
+ %% run. Consequently we set the module to
+ %% rabbit_mirror_queue_master so that if we do become a
+ %% rabbit_amqqueue_process before then, sane things will
+ %% happen.
+ Fun =
+ fun (?MODULE, State1 = #state { known_senders = KS1,
+ gm = GM }) ->
+ %% We're running still as a slave
+ ok = case dict:is_key(ChPid, KS1) of
+ false ->
+ ok;
+ true ->
+ gm:broadcast(
+ GM, {ensure_monitoring, [ChPid]})
+ end,
+ State1;
+ (rabbit_mirror_queue_master, State1) ->
+ %% We've become a master. State1 is now opaque
+ %% to us. When we became master, if ChPid was
+ %% still known to us then we'd have set up
+ %% monitoring of it then, so this is now a
+ %% noop.
+ State1
+ end,
+ %% Note that we do not remove our knowledge of this ChPid
+ %% until we get the sender_death from GM.
+ timer:apply_after(
+ ?DEATH_TIMEOUT, rabbit_amqqueue, run_backing_queue_async,
+ [self(), rabbit_mirror_queue_master, Fun])
+ end,
+ State.
+
maybe_enqueue_message(
Delivery = #delivery { message = #basic_message { id = MsgId },
msg_seq_no = MsgSeqNo,
@@ -548,6 +617,7 @@ maybe_enqueue_message(
EnqueueOnPromotion,
State = #state { sender_queues = SQ,
msg_id_status = MS }) ->
+ State1 = ensure_monitoring(ChPid, State),
%% We will never see {published, ChPid, MsgSeqNo} here.
case dict:find(MsgId, MS) of
error ->
@@ -557,30 +627,30 @@ maybe_enqueue_message(
end,
SQ1 = dict:store(ChPid,
queue:in({Delivery, EnqueueOnPromotion}, MQ), SQ),
- State #state { sender_queues = SQ1 };
+ State1 #state { sender_queues = SQ1 };
{ok, {confirmed, ChPid}} ->
%% BQ has confirmed it but we didn't know what the
%% msg_seq_no was at the time. We do now!
ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
- State #state { msg_id_status = dict:erase(MsgId, MS) };
+ State1 #state { msg_id_status = dict:erase(MsgId, MS) };
{ok, {published, ChPid}} ->
%% It was published to the BQ and we didn't know the
%% msg_seq_no so couldn't confirm it at the time.
- case needs_confirming(Delivery, State) of
+ case needs_confirming(Delivery, State1) of
never ->
- State #state { msg_id_status = dict:erase(MsgId, MS) };
+ State1 #state { msg_id_status = dict:erase(MsgId, MS) };
eventually ->
- State #state {
+ State1 #state {
msg_id_status =
dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) };
immediately ->
ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
- State #state { msg_id_status = dict:erase(MsgId, MS) }
+ State1 #state { msg_id_status = dict:erase(MsgId, MS) }
end;
{ok, discarded} ->
%% We've already heard from GM that the msg is to be
%% discarded. We won't see this again.
- State #state { msg_id_status = dict:erase(MsgId, MS) }
+ State1 #state { msg_id_status = dict:erase(MsgId, MS) }
end;
maybe_enqueue_message(_Delivery, _EnqueueOnPromotion, State) ->
%% We don't support txns in mirror queues.
@@ -601,6 +671,7 @@ process_instruction(
%% which means that we're going to have to hang on to the fact
%% that we've seen the msg_id confirmed until we can associate it
%% with a msg_seq_no.
+ State1 = ensure_monitoring(ChPid, State),
MS1 = dict:store(MsgId, {published, ChPid}, MS),
{SQ1, MS2} =
case dict:find(ChPid, SQ) of
@@ -618,7 +689,7 @@ process_instruction(
%% first. Thus we need to deal with confirms
%% here.
{dict:store(ChPid, MQ1, SQ),
- case needs_confirming(Delivery, State) of
+ case needs_confirming(Delivery, State1) of
never ->
MS;
eventually ->
@@ -639,19 +710,19 @@ process_instruction(
end
end,
- State1 = State #state { sender_queues = SQ1,
- msg_id_status = MS2 },
+ State2 = State1 #state { sender_queues = SQ1,
+ msg_id_status = MS2 },
{ok,
case Deliver of
false ->
BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
- State1 #state { backing_queue_state = BQS1 };
+ State2 #state { backing_queue_state = BQS1 };
{true, AckRequired} ->
{AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps,
ChPid, BQS),
maybe_store_ack(AckRequired, MsgId, AckTag,
- State1 #state { backing_queue_state = BQS1 })
+ State2 #state { backing_queue_state = BQS1 })
end};
process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
State = #state { sender_queues = SQ,
@@ -660,6 +731,7 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
msg_id_status = MS }) ->
%% Many of the comments around the publish head above apply here
%% too.
+ State1 = ensure_monitoring(ChPid, State),
MS1 = dict:store(MsgId, discarded, MS),
{SQ1, MS2} =
case dict:find(ChPid, SQ) of
@@ -685,9 +757,9 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
end
end,
BQS1 = BQ:discard(Msg, ChPid, BQS),
- {ok, State #state { sender_queues = SQ1,
- msg_id_status = MS2,
- backing_queue_state = BQS1 }};
+ {ok, State1 #state { sender_queues = SQ1,
+ msg_id_status = MS2,
+ backing_queue_state = BQS1 }};
process_instruction({set_length, Length},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -746,6 +818,19 @@ process_instruction({requeue, MsgPropsFun, MsgIds},
State #state { msg_id_ack = dict:new(),
backing_queue_state = BQS2 }
end};
+process_instruction({sender_death, ChPid},
+ State = #state { sender_queues = SQ,
+ known_senders = KS }) ->
+ rabbit_log:info("Slave received death of sender ~p~n", [ChPid]),
+ {ok, case dict:find(ChPid, KS) of
+ error ->
+ State;
+ {ok, MRef} ->
+ true = erlang:demonitor(MRef),
+ KS1 = dict:erase(ChPid, KS),
+ SQ1 = dict:erase(ChPid, SQ),
+ State #state { sender_queues = SQ1, known_senders = KS1}
+ end};
process_instruction(delete_and_terminate,
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->