diff options
| author | Emile Joubert <emile@rabbitmq.com> | 2013-08-06 16:45:33 +0100 |
|---|---|---|
| committer | Emile Joubert <emile@rabbitmq.com> | 2013-08-06 16:45:33 +0100 |
| commit | b5b1a41a3779acc1072673a98189db490dec75e3 (patch) | |
| tree | 249fd6e5a50242271f25946f31dbb4077824d95a /src | |
| parent | e99bb3ea4832648f5b613b9f19a895a7edd161f6 (diff) | |
| download | rabbitmq-server-git-b5b1a41a3779acc1072673a98189db490dec75e3.tar.gz | |
Delay clearing of state in slaves
until sender down notification is received from channel as well as GM
in order to avoid messages being enqueued more than once
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 78 |
2 files changed, 63 insertions, 42 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index c9918fed2e..f54e9bd18b 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -222,20 +222,19 @@ %% sender_death message to all the slaves, saying the sender has %% died. Once the slaves receive the sender_death message, they know %% that they're not going to receive any more instructions from the gm -%% regarding that sender, thus they throw away any publications from -%% the sender pending publication instructions. However, it is -%% possible that the coordinator receives the DOWN and communicates -%% that to the master before the master has finished receiving and -%% processing publishes from the sender. This turns out not to be a -%% problem: the sender has actually died, and so will not need to -%% receive confirms or other feedback, and should further messages be -%% "received" from the sender, the master will ask the coordinator to -%% set up a new monitor, and will continue to process the messages -%% normally. Slaves may thus receive publishes via gm from previously -%% declared "dead" senders, but again, this is fine: should the slave -%% have just thrown out the message it had received directly from the -%% sender (due to receiving a sender_death message via gm), it will be -%% able to cope with the publication purely from the master via gm. +%% regarding that sender. However, it is possible that the coordinator +%% receives the DOWN and communicates that to the master before the +%% master has finished receiving and processing publishes from the +%% sender. This turns out not to be a problem: the sender has actually +%% died, and so will not need to receive confirms or other feedback, +%% and should further messages be "received" from the sender, the +%% master will ask the coordinator to set up a new monitor, and +%% will continue to process the messages normally. Slaves may thus +%% receive publishes via gm from previously declared "dead" senders, +%% but again, this is fine: should the slave have just thrown out the +%% message it had received directly from the sender (due to receiving +%% a sender_death message via gm), it will be able to cope with the +%% publication purely from the master via gm. %% %% When a slave receives a DOWN message for a sender, if it has not %% received the sender_death message from the master via gm already, diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 1996fd0a7a..6425a85580 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -61,7 +61,7 @@ sync_timer_ref, rate_timer_ref, - sender_queues, %% :: Pid -> {Q Msg, Set MsgId} + sender_queues, %% :: Pid -> {Q Msg, Set MsgId, ChState} msg_id_ack, %% :: MsgId -> AckTag msg_id_status, @@ -275,7 +275,7 @@ handle_info({'DOWN', _MonitorRef, process, MPid, _Reason}, handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) -> local_sender_death(ChPid, State), - noreply(State); + noreply(sender_lifetime(ChPid, down_from_ch, State)); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; @@ -563,10 +563,15 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, (_Msgid, _Status, MTC0) -> MTC0 end, gb_trees:empty(), MS), - Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ), - Delivery <- queue:to_list(PubQ)], + Deliveries = [Delivery || + {_ChPid, {PubQ, _PendCh, _ChState}} <- dict:to_list(SQ), + Delivery <- queue:to_list(PubQ)], + AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- dict:to_list(SQ)], + KS1 = lists:foldl(fun (ChPid0, KS0) -> + pmon:demonitor(ChPid0, KS0) + end, KS, AwaitGmDown), rabbit_amqqueue_process:init_with_backing_queue_state( - Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS, + Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1, MTC). noreply(State) -> @@ -643,12 +648,39 @@ confirm_sender_death(Pid) -> State end, %% Note that we do not remove our knowledge of this ChPid until we - %% get the sender_death from GM. + %% get the sender_death from GM as well as a DOWN notification. {ok, _TRef} = timer:apply_after( ?DEATH_TIMEOUT, rabbit_amqqueue, run_backing_queue, [self(), rabbit_mirror_queue_master, Fun]), ok. +forget_sender(running, _) -> false; +forget_sender(_, running) -> false; +forget_sender(Down1, Down2) when Down1 =/= Down2 -> true. + +%% Record and process lifetime events from channels. Forget all about a channel +%% only when down notifications are received from both the channel and from gm. +sender_lifetime(ChPid, ChState, State = #state { sender_queues = SQ, + msg_id_status = MS, + known_senders = KS }) -> + case dict:find(ChPid, SQ) of + error -> + State; + {ok, {MQ, PendCh, ChStateRecord}} -> + case forget_sender(ChState, ChStateRecord) of + true -> + credit_flow:peer_down(ChPid), + State #state { sender_queues = dict:erase(ChPid, SQ), + msg_id_status = lists:foldl( + fun dict:erase/2, + MS, sets:to_list(PendCh)), + known_senders = pmon:demonitor(ChPid, KS) }; + false -> + SQ1 = dict:store(ChPid, {MQ, PendCh, ChState}, SQ), + State #state { sender_queues = SQ1 } + end + end. + maybe_enqueue_message( Delivery = #delivery { message = #basic_message { id = MsgId }, sender = ChPid }, @@ -657,9 +689,9 @@ maybe_enqueue_message( %% We will never see {published, ChPid, MsgSeqNo} here. case dict:find(MsgId, MS) of error -> - {MQ, PendingCh} = get_sender_queue(ChPid, SQ), + {MQ, PendingCh, ChState} = get_sender_queue(ChPid, SQ), MQ1 = queue:in(Delivery, MQ), - SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ), + SQ1 = dict:store(ChPid, {MQ1, PendingCh, ChState}, SQ), State1 #state { sender_queues = SQ1 }; {ok, Status} -> MS1 = send_or_record_confirm( @@ -671,7 +703,7 @@ maybe_enqueue_message( get_sender_queue(ChPid, SQ) -> case dict:find(ChPid, SQ) of - error -> {queue:new(), sets:new()}; + error -> {queue:new(), sets:new(), running}; {ok, Val} -> Val end. @@ -679,19 +711,20 @@ remove_from_pending_ch(MsgId, ChPid, SQ) -> case dict:find(ChPid, SQ) of error -> SQ; - {ok, {MQ, PendingCh}} -> - dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh)}, SQ) + {ok, {MQ, PendingCh, ChState}} -> + dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh), ChState}, + SQ) end. publish_or_discard(Status, ChPid, MsgId, State = #state { sender_queues = SQ, msg_id_status = MS }) -> %% We really are going to do the publish/discard right now, even %% though we may not have seen it directly from the channel. But - %% we cannot issues confirms until the latter has happened. So we + %% we cannot issue confirms until the latter has happened. So we %% need to keep track of the MsgId and its confirmation status in %% the meantime. State1 = ensure_monitoring(ChPid, State), - {MQ, PendingCh} = get_sender_queue(ChPid, SQ), + {MQ, PendingCh, ChState} = get_sender_queue(ChPid, SQ), {MQ1, PendingCh1, MS1} = case queue:out(MQ) of {empty, _MQ2} -> @@ -711,7 +744,7 @@ publish_or_discard(Status, ChPid, MsgId, %% expecting any confirms from us. {MQ, PendingCh, MS} end, - SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ), + SQ1 = dict:store(ChPid, {MQ1, PendingCh1, ChState}, SQ), State1 #state { sender_queues = SQ1, msg_id_status = MS1 }. @@ -772,25 +805,14 @@ process_instruction({requeue, MsgIds}, {ok, State #state { msg_id_ack = MA1, backing_queue_state = BQS1 }}; process_instruction({sender_death, ChPid}, - State = #state { sender_queues = SQ, - msg_id_status = MS, - known_senders = KS }) -> + State = #state { known_senders = KS }) -> %% The channel will be monitored iff we have received a message %% from it. In this case we just want to avoid doing work if we %% never got any messages. {ok, case pmon:is_monitored(ChPid, KS) of false -> State; - true -> MS1 = case dict:find(ChPid, SQ) of - error -> - MS; - {ok, {_MQ, PendingCh}} -> - lists:foldl(fun dict:erase/2, MS, - sets:to_list(PendingCh)) - end, - credit_flow:peer_down(ChPid), - State #state { sender_queues = dict:erase(ChPid, SQ), - msg_id_status = MS1, - known_senders = pmon:demonitor(ChPid, KS) } + true -> credit_flow:peer_down(ChPid), + sender_lifetime(ChPid, down_from_gm, State) end}; process_instruction({depth, Depth}, State = #state { backing_queue = BQ, |
