diff options
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 172 |
1 files changed, 93 insertions, 79 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 558e372eb4..b6aaecb79f 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -59,7 +59,7 @@ sync_timer_ref, rate_timer_ref, - sender_queues, %% :: Pid -> MsgQ + sender_queues, %% :: Pid -> {Q {Msg, Bool}, Set MsgId} msg_id_ack, %% :: MsgId -> AckTag ack_num, @@ -500,7 +500,7 @@ promote_me(From, #state { q = Q, {MsgId, {published, ChPid, MsgSeqNo}} <- dict:to_list(MS)]), NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)], AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)], - Deliveries = [Delivery || {_ChPid, PubQ} <- dict:to_list(SQ), + Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ), {Delivery, true} <- queue:to_list(PubQ)], QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( Q1, rabbit_mirror_queue_master, MasterState, RateTRef, @@ -610,47 +610,65 @@ maybe_enqueue_message( sender = ChPid, txn = none }, EnqueueOnPromotion, - State = #state { sender_queues = SQ, - msg_id_status = MS }) -> + State = #state { sender_queues = SQ, msg_id_status = MS }) -> State1 = ensure_monitoring(ChPid, State), %% We will never see {published, ChPid, MsgSeqNo} here. case dict:find(MsgId, MS) of error -> - MQ = case dict:find(ChPid, SQ) of - {ok, MQ1} -> MQ1; - error -> queue:new() - end, - SQ1 = dict:store(ChPid, - queue:in({Delivery, EnqueueOnPromotion}, MQ), SQ), + {MQ, PendingCh} = get_sender_queue(ChPid, SQ), + MQ1 = queue:in({Delivery, EnqueueOnPromotion}, MQ), + SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ), State1 #state { sender_queues = SQ1 }; {ok, {confirmed, ChPid}} -> %% BQ has confirmed it but we didn't know what the %% msg_seq_no was at the time. We do now! ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), - State1 #state { msg_id_status = dict:erase(MsgId, MS) }; + SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), + State1 #state { sender_queues = SQ1, + msg_id_status = dict:erase(MsgId, MS) }; {ok, {published, ChPid}} -> %% It was published to the BQ and we didn't know the %% msg_seq_no so couldn't confirm it at the time. case needs_confirming(Delivery, State1) of never -> - State1 #state { msg_id_status = dict:erase(MsgId, MS) }; + SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), + State1 #state { msg_id_status = dict:erase(MsgId, MS), + sender_queues = SQ1 }; eventually -> State1 #state { msg_id_status = dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) }; immediately -> ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), - State1 #state { msg_id_status = dict:erase(MsgId, MS) } + SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), + State1 #state { msg_id_status = dict:erase(MsgId, MS), + sender_queues = SQ1 } end; {ok, discarded} -> %% We've already heard from GM that the msg is to be %% discarded. We won't see this again. - State1 #state { msg_id_status = dict:erase(MsgId, MS) } + SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), + State1 #state { msg_id_status = dict:erase(MsgId, MS), + sender_queues = SQ1 } end; maybe_enqueue_message(_Delivery, _EnqueueOnPromotion, State) -> %% We don't support txns in mirror queues. State. +get_sender_queue(ChPid, SQ) -> + case dict:find(ChPid, SQ) of + error -> {queue:new(), sets:new()}; + {ok, Val} -> Val + end. + +remove_from_pending_ch(MsgId, ChPid, SQ) -> + case dict:find(ChPid, SQ) of + error -> + SQ; + {ok, {MQ, PendingCh}} -> + dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh)}, SQ) + end. + process_instruction( {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { id = MsgId }}, State = #state { sender_queues = SQ, @@ -667,46 +685,39 @@ process_instruction( %% that we've seen the msg_id confirmed until we can associate it %% with a msg_seq_no. State1 = ensure_monitoring(ChPid, State), - MS1 = dict:store(MsgId, {published, ChPid}, MS), - {SQ1, MS2} = - case dict:find(ChPid, SQ) of - error -> - {SQ, MS1}; - {ok, MQ} -> - case queue:out(MQ) of - {empty, _MQ} -> - {SQ, MS1}; - {{value, {Delivery = #delivery { - msg_seq_no = MsgSeqNo, - message = #basic_message { id = MsgId } }, - _EnqueueOnPromotion}}, MQ1} -> - %% We received the msg from the channel - %% first. Thus we need to deal with confirms - %% here. - {dict:store(ChPid, MQ1, SQ), - case needs_confirming(Delivery, State1) of - never -> - MS; - eventually -> - dict:store( - MsgId, {published, ChPid, MsgSeqNo}, MS); - immediately -> - ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), - MS - end}; - {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ1} -> - %% The instruction was sent to us before we - %% were within the mirror_pids within the - %% #amqqueue{} record. We'll never receive the - %% message directly from the channel. And the - %% channel will not be expecting any confirms - %% from us. - {SQ, MS} - end + {MQ, PendingCh} = get_sender_queue(ChPid, SQ), + {MQ1, PendingCh1, MS1} = + case queue:out(MQ) of + {empty, _MQ2} -> + {MQ, sets:add_element(MsgId, PendingCh), + dict:store(MsgId, {published, ChPid}, MS)}; + {{value, {Delivery = #delivery { + msg_seq_no = MsgSeqNo, + message = #basic_message { id = MsgId } }, + _EnqueueOnPromotion}}, MQ2} -> + %% We received the msg from the channel first. Thus we + %% need to deal with confirms here. + case needs_confirming(Delivery, State1) of + never -> + {MQ2, PendingCh, MS}; + eventually -> + {MQ2, sets:add_element(MsgId, PendingCh), + dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)}; + immediately -> + ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + {MQ2, PendingCh, MS} + end; + {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} -> + %% The instruction was sent to us before we were + %% within the mirror_pids within the #amqqueue{} + %% record. We'll never receive the message directly + %% from the channel. And the channel will not be + %% expecting any confirms from us. + {MQ, PendingCh, MS} end, - State2 = State1 #state { sender_queues = SQ1, - msg_id_status = MS2 }, + SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ), + State2 = State1 #state { sender_queues = SQ1, msg_id_status = MS1 }, {ok, case Deliver of @@ -727,33 +738,28 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }}, %% Many of the comments around the publish head above apply here %% too. State1 = ensure_monitoring(ChPid, State), - MS1 = dict:store(MsgId, discarded, MS), - {SQ1, MS2} = - case dict:find(ChPid, SQ) of - error -> - {SQ, MS1}; - {ok, MQ} -> - case queue:out(MQ) of - {empty, _MQ} -> - {SQ, MS1}; - {{value, {#delivery { - message = #basic_message { id = MsgId } }, - _EnqueueOnPromotion}}, MQ1} -> - %% We've already seen it from the channel, - %% we're not going to see this again, so don't - %% add it to MS - {dict:store(ChPid, MQ1, SQ), MS}; - {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ1} -> - %% The instruction was sent to us before we - %% were within the mirror_pids within the - %% #amqqueue{} record. We'll never receive the - %% message directly from the channel. - {SQ, MS} - end + {MQ, PendingCh} = get_sender_queue(ChPid, SQ), + {MQ1, PendingCh1, MS1} = + case queue:out(MQ) of + {empty, _MQ} -> + {MQ, sets:add_element(MsgId, PendingCh), + dict:store(MsgId, discarded, MS)}; + {{value, {#delivery { message = #basic_message { id = MsgId } }, + _EnqueueOnPromotion}}, MQ2} -> + %% We've already seen it from the channel, we're not + %% going to see this again, so don't add it to MS + {MQ2, PendingCh, MS}; + {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} -> + %% The instruction was sent to us before we were + %% within the mirror_pids within the #amqqueue{} + %% record. We'll never receive the message directly + %% from the channel. + {MQ, PendingCh, MS} end, + SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ), BQS1 = BQ:discard(Msg, ChPid, BQS), {ok, State1 #state { sender_queues = SQ1, - msg_id_status = MS2, + msg_id_status = MS1, backing_queue_state = BQS1 }}; process_instruction({set_length, Length}, State = #state { backing_queue = BQ, @@ -815,15 +821,23 @@ process_instruction({requeue, MsgPropsFun, MsgIds}, end}; process_instruction({sender_death, ChPid}, State = #state { sender_queues = SQ, + msg_id_status = MS, known_senders = KS }) -> {ok, case dict:find(ChPid, KS) of error -> State; {ok, MRef} -> true = erlang:demonitor(MRef), - KS1 = dict:erase(ChPid, KS), - SQ1 = dict:erase(ChPid, SQ), - State #state { sender_queues = SQ1, known_senders = KS1 } + MS1 = case dict:find(ChPid, SQ) of + error -> + MS; + {ok, {_MQ, PendingCh}} -> + lists:foldl(fun dict:erase/2, MS, + sets:to_list(PendingCh)) + end, + State #state { sender_queues = dict:erase(ChPid, SQ), + msg_id_status = MS1, + known_senders = dict:erase(ChPid, KS) } end}; process_instruction(delete_and_terminate, State = #state { backing_queue = BQ, |
