diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2011-03-02 13:01:30 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-03-02 13:01:30 +0000 |
| commit | dacc27d78b8ff020a41a6919cd3f5a54fc9d586c (patch) | |
| tree | 44d2306da308e9ed764ed218a8a1dcdd349d5710 | |
| parent | 7768d11214271ced5f957b4fcf3b3b6a966d8cd3 (diff) | |
| download | rabbitmq-server-git-dacc27d78b8ff020a41a6919cd3f5a54fc9d586c.tar.gz | |
Fixed confirms in HA queues. Broke slave promotion. Will fix
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 168 |
1 files changed, 104 insertions, 64 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index df9a28f4a3..5c101ee2d5 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -88,9 +88,8 @@ sender_queues, %% :: Pid -> MsgQ guid_ack, %% :: Guid -> AckTag - seen, %% Set Guid - guid_to_channel %% for confirms + guid_status }). -define(SYNC_INTERVAL, 25). %% milliseconds @@ -144,9 +143,7 @@ init([#amqqueue { name = QueueName } = Q]) -> sender_queues = dict:new(), guid_ack = dict:new(), - seen = sets:new(), - - guid_to_channel = dict:new() + guid_status = dict:new() }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}; @@ -318,35 +315,41 @@ maybe_run_queue_via_backing_queue( {Guids, BQS1} = BQ:invoke(Mod, Fun, BQS), confirm_messages(Guids, State #state { backing_queue_state = BQS1 }). -record_confirm_or_confirm(#delivery { msg_seq_no = undefined }, _Q, GTC) -> - GTC; -record_confirm_or_confirm( - #delivery { sender = ChPid, - message = #basic_message { is_persistent = true, - guid = Guid }, - msg_seq_no = MsgSeqNo }, #amqqueue { durable = true }, GTC) -> - dict:store(Guid, {ChPid, MsgSeqNo}, GTC); -record_confirm_or_confirm(#delivery { sender = ChPid, msg_seq_no = MsgSeqNo }, - _Q, GTC) -> - ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), - GTC. - -confirm_messages(Guids, State = #state { guid_to_channel = GTC }) -> - {CMs, GTC1} = + +needs_confirming(#delivery{ msg_seq_no = undefined }, _State) -> + never; +needs_confirming(#delivery { message = #basic_message { + is_persistent = true } }, + #state { q = #amqqueue { durable = true } }) -> + eventually; +needs_confirming(_Delivery, _State) -> + immediately. + +confirm_messages(Guids, State = #state { guid_status = GS }) -> + {GS1, CMs} = lists:foldl( - fun(Guid, {CMs, GTC0}) -> - case dict:find(Guid, GTC0) of - {ok, {ChPid, MsgSeqNo}} -> - {gb_trees_cons(ChPid, MsgSeqNo, CMs), - dict:erase(Guid, GTC0)}; - _ -> - {CMs, GTC0} + fun (Guid, {GSN, CMsN} = Acc) -> + %% We will never see {confirmed, ChPid} here. + case dict:find(Guid, GSN) of + error -> + %% If it needed confirming, it'll have + %% already been done. + Acc; + {ok, {published, ChPid}} -> + %% Still not seen it from the channel, just + %% record that it's been confirmed. + {dict:store(Guid, {confirmed, ChPid}, GSN), CMsN}; + {ok, {published, ChPid, MsgSeqNo}} -> + %% Seen from both GM and Channel. Can now + %% confirm. + {dict:erase(Guid, GSN), + gb_trees_cons(ChPid, MsgSeqNo, CMsN)} end - end, {gb_trees:empty(), GTC}, Guids), - gb_trees:map(fun(ChPid, MsgSeqNos) -> - rabbit_channel:confirm(ChPid, MsgSeqNos) + end, {GS, gb_trees:empty()}, Guids), + gb_trees:map(fun (ChPid, MsgSeqNos) -> + ok = rabbit_channel:confirm(ChPid, MsgSeqNos) end, CMs), - State #state { guid_to_channel = GTC1 }. + State #state { guid_status = GS1 }. gb_trees_cons(Key, Value, Tree) -> case gb_trees:lookup(Key, Tree) of @@ -363,7 +366,6 @@ promote_me(From, #state { q = Q, backing_queue_state = BQS, rate_timer_ref = RateTRef, sender_queues = SQ, - seen = Seen, guid_ack = GA }) -> rabbit_log:info("Promoting slave ~p for ~s~n", [self(), rabbit_misc:rs(Q #amqqueue.name)]), @@ -371,8 +373,9 @@ promote_me(From, #state { q = Q, true = unlink(GM), gen_server2:reply(From, {promote, CPid}), ok = gm:confirmed_broadcast(GM, heartbeat), + %% TODO fix up seen MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( - CPid, BQ, BQS, GM, Seen), + CPid, BQ, BQS, GM, sets:new()), %% We have to do the requeue via this init because otherwise we %% don't have access to the relevent MsgPropsFun. Also, we are %% already in mnesia as the master queue pid. Thus we cannot just @@ -441,59 +444,97 @@ stop_rate_timer(State = #state { rate_timer_ref = TRef }) -> State #state { rate_timer_ref = undefined }. maybe_enqueue_message( - Delivery = #delivery { message = #basic_message { guid = Guid }, - sender = ChPid }, - State = #state { q = Q, - sender_queues = SQ, - seen = Seen, - guid_to_channel = GTC }) -> - case sets:is_element(Guid, Seen) of - true -> - GTC1 = record_confirm_or_confirm(Delivery, Q, GTC), - State #state { guid_to_channel = GTC1, - seen = sets:del_element(Guid, Seen) }; - false -> + Delivery = #delivery { message = #basic_message { guid = Guid }, + msg_seq_no = MsgSeqNo, + sender = ChPid }, + State = #state { sender_queues = SQ, + guid_status = GS }) -> + %% We will never see {published, ChPid, MsgSeqNo} here. + case dict:find(Guid, GS) of + error -> MQ = case dict:find(ChPid, SQ) of {ok, MQ1} -> MQ1; error -> queue:new() end, SQ1 = dict:store(ChPid, queue:in(Delivery, MQ), SQ), - State #state { sender_queues = SQ1 } + State #state { sender_queues = SQ1 }; + {ok, {confirmed, ChPid}} -> + %% BQ has confirmed it but we didn't know what the + %% msg_seq_no was at the time. We do now! + ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + State #state { guid_status = dict:erase(Guid, GS) }; + {ok, {published, ChPid}} -> + %% It was published to the BQ and we didn't know the + %% msg_seq_no so couldn't confirm it at the time. + case needs_confirming(Delivery, State) of + never -> + State #state { guid_status = dict:erase(Guid, GS) }; + eventually -> + State #state { + guid_status = dict:store( + Guid, {published, ChPid, MsgSeqNo}, GS) }; + immediately -> + ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + State #state { guid_status = dict:erase(Guid, GS) } + end end. process_instruction( {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { guid = Guid }}, - State = #state { q = Q, - sender_queues = SQ, + State = #state { sender_queues = SQ, backing_queue = BQ, backing_queue_state = BQS, guid_ack = GA, - seen = Seen, - guid_to_channel = GTC }) -> - {SQ1, Seen1, GTC1} = + guid_status = GS }) -> + + %% We really are going to do the publish right now, even though we + %% may not have seen it directly from the channel. As a result, we + %% may know that it needs confirming without knowing its + %% msg_seq_no, which means that we can see the confirmation come + %% back from the backing queue without knowing the msg_seq_no, + %% which means that we're going to have to hang on to the fact + %% that we've seen the guid confirmed until we can associate it + %% with a msg_seq_no. + GS1 = dict:store(Guid, {published, ChPid}, GS), + {SQ1, GS2} = case dict:find(ChPid, SQ) of error -> - {SQ, sets:add_element(Guid, Seen), GTC}; + {SQ, GS1}; {ok, MQ} -> case queue:out(MQ) of {empty, _MQ} -> - {SQ, sets:add_element(Guid, Seen), GTC}; + {SQ, GS1}; {{value, Delivery = #delivery { - message = #basic_message { guid = Guid } }}, + msg_seq_no = MsgSeqNo, + message = #basic_message { guid = Guid } }}, MQ1} -> - GTC2 = record_confirm_or_confirm(Delivery, Q, GTC), - {dict:store(ChPid, MQ1, SQ), Seen, GTC2}; + %% We received the msg from the channel + %% first. Thus we need to deal with confirms + %% here. + {dict:store(ChPid, MQ1, SQ), + case needs_confirming(Delivery, State) of + never -> + GS; + eventually -> + dict:store( + Guid, {published, ChPid, MsgSeqNo}, GS); + immediately -> + ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + GS + end}; {{value, #delivery {}}, _MQ1} -> %% The instruction was sent to us before we %% were within the mirror_pids within the - %% amqqueue record. We'll never receive the - %% message directly. - {SQ, Seen, GTC} + %% #amqqueue{} record. We'll never receive the + %% message directly from the channel. And the + %% channel will not be expecting any confirms + %% from us. + {SQ, GS} end end, - State1 = State #state { sender_queues = SQ1, - seen = Seen1, - guid_to_channel = GTC1 }, + + State1 = State #state { sender_queues = SQ1, + guid_status = GS2 }, {ok, case Deliver of false -> @@ -507,8 +548,7 @@ process_instruction( false -> GA end, State1 #state { backing_queue_state = BQS1, - guid_ack = GA1, - guid_to_channel = GTC1 } + guid_ack = GA1 } end}; process_instruction({set_length, Length}, State = #state { backing_queue = BQ, |
