diff options
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 63 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 125 |
2 files changed, 135 insertions, 53 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 0d64ab8e77..4628796f57 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -26,7 +26,7 @@ -export([start/1, stop/0]). --export([promote_backing_queue_state/4]). +-export([promote_backing_queue_state/5]). -behaviour(rabbit_backing_queue). @@ -36,7 +36,8 @@ coordinator, backing_queue, backing_queue_state, - set_delivered + set_delivered, + fakes }). %% --------------------------------------------------------------------------- @@ -64,14 +65,16 @@ init(#amqqueue { arguments = Args } = Q, Recover) -> coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS, - set_delivered = 0 }. + set_delivered = 0, + fakes = sets:new() }. -promote_backing_queue_state(CPid, BQ, BQS, GM) -> +promote_backing_queue_state(CPid, BQ, BQS, GM, Fakes) -> #state { gm = GM, coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS, - set_delivered = BQ:len(BQS) }. + set_delivered = BQ:len(BQS), + fakes = Fakes }. terminate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> %% Backing queue termination. The queue is going down but @@ -126,30 +129,54 @@ dropwhile(Fun, State = #state { gm = GM, fetch(AckRequired, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS, - set_delivered = SetDelivered }) -> + set_delivered = SetDelivered, + fakes = Fakes }) -> {Result, BQS1} = BQ:fetch(AckRequired, BQS), - State1 = State #state { backing_queue_state = BQS1 }, case Result of empty -> - {Result, State1}; + {Result, State #state { backing_queue_state = BQS1 }}; {#basic_message { guid = Guid } = Message, IsDelivered, AckTag, Remaining} -> - ok = gm:broadcast(GM, {fetch, AckRequired, Guid, Remaining}), - IsDelivered1 = IsDelivered orelse SetDelivered > 0, SetDelivered1 = lists:max([0, SetDelivered - 1]), - {{Message, IsDelivered1, AckTag, Remaining}, - State1 #state { set_delivered = SetDelivered1 }} + case sets:is_element(Guid, Fakes) of + true -> + {BQS2, Fakes1} = + case AckRequired of + true -> {[Guid], BQS3} = BQ:ack([AckTag], BQS1), + {BQS3, Fakes}; + false -> {BQS1, sets:del_element(Guid, Fakes)} + end, + ok = gm:broadcast(GM, {fetch, false, Guid, Remaining}), + fetch(AckRequired, + State #state { backing_queue_state = BQS2, + set_delivered = SetDelivered1, + fakes = Fakes1 }); + false -> + ok = gm:broadcast(GM, + {fetch, AckRequired, Guid, Remaining}), + IsDelivered1 = IsDelivered orelse SetDelivered > 0, + Fakes1 = case SetDelivered + SetDelivered1 of + 1 -> sets:new(); %% transition to 0 + _ -> Fakes + end, + {{Message, IsDelivered1, AckTag, Remaining}, + State #state { backing_queue_state = BQS1, + set_delivered = SetDelivered1, + fakes = Fakes1 }} + end end. ack(AckTags, State = #state { gm = GM, backing_queue = BQ, - backing_queue_state = BQS }) -> + backing_queue_state = BQS, + fakes = Fakes }) -> {Guids, BQS1} = BQ:ack(AckTags, BQS), - case Guids of - [] -> ok; - _ -> ok = gm:broadcast(GM, {ack, Guids}) - end, - {Guids, State #state { backing_queue_state = BQS1 }}. + Fakes1 = case Guids of + [] -> Fakes; + _ -> ok = gm:broadcast(GM, {ack, Guids}), + sets:difference(Fakes, sets:from_list(Guids)) + end, + {Guids, State #state { backing_queue_state = BQS1, fakes = Fakes1 }}. tx_publish(Txn, Msg, MsgProps, ChPid, #state {} = State) -> %% gm:broadcast(GM, {tx_publish, Txn, Guid, MsgProps, ChPid}) diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index f124bc9eb0..0134787cd0 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -75,6 +75,7 @@ -behaviour(gm). -include("rabbit.hrl"). +-include("rabbit_framing.hrl"). -include("gm_specs.hrl"). -record(state, { q, @@ -87,6 +88,7 @@ sender_queues, %% :: Pid -> MsgQ guid_ack, %% :: Guid -> AckTag instructions, %% :: InstrQ + fakes, %% :: Set Guid guid_to_channel %% for confirms }). @@ -141,6 +143,7 @@ init([#amqqueue { name = QueueName } = Q]) -> sender_queues = dict:new(), guid_ack = dict:new(), instructions = queue:new(), + fakes = sets:new(), guid_to_channel = dict:new() }, hibernate, @@ -190,7 +193,7 @@ handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) -> handle_cast({gm, Instruction}, State = #state { instructions = InstrQ }) -> State1 = State #state { instructions = queue:in(Instruction, InstrQ) }, case queue:is_empty(InstrQ) of - true -> handle_process_result(process_instructions(State1)); + true -> handle_process_result(process_instructions(false, State1)); false -> noreply(State1) end; @@ -320,21 +323,25 @@ maybe_confirm_message(Guid, GTC) -> handle_process_result({continue, State}) -> noreply(State); handle_process_result({stop, State}) -> {stop, normal, State}. -promote_me(From, #state { q = Q, - gm = GM, - backing_queue = BQ, - backing_queue_state = BQS, - rate_timer_ref = RateTRef, - sender_queues = SQ, - guid_ack = GA }) -> +promote_me(From, State = #state { q = Q }) -> rabbit_log:info("Promoting slave ~p for queue ~p~n", [self(), Q #amqqueue.name]), + #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + rate_timer_ref = RateTRef, + sender_queues = SQ, + guid_ack = GA, + instructions = Instr, + fakes = Fakes } = + process_instructions(true, State), + true = queue:is_empty(Instr), %% ASSERTION {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, GM), true = unlink(GM), gen_server2:reply(From, {promote, CPid}), ok = gm:confirmed_broadcast(GM, heartbeat), MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( - CPid, BQ, BQS, GM), + CPid, BQ, BQS, GM, Fakes), %% We have to do the requeue via this init because otherwise we %% don't have access to the relevent MsgPropsFun. Also, we are %% already in mnesia as the master queue pid. Thus we cannot just @@ -387,19 +394,19 @@ enqueue_message(Delivery = #delivery { sender = ChPid }, SQ1 = dict:store(ChPid, queue:in(Delivery, Q), SQ), State1 = State #state { sender_queues = SQ1 }, case queue:is_empty(Q) of - true -> process_instructions(State1); + true -> process_instructions(false, State1); false -> {continue, State1} end. -process_instructions(State = #state { instructions = InstrQ }) -> +process_instructions(Flush, State = #state { instructions = InstrQ }) -> case queue:out(InstrQ) of {empty, _InstrQ} -> {continue, State}; {{value, Instr}, InstrQ1} -> - case process_instruction(Instr, State) of + case process_instruction(Flush, Instr, State) of {processed, State1} -> process_instructions( - State1 #state { instructions = InstrQ1 }); + Flush, State1 #state { instructions = InstrQ1 }); {stop, State1} -> {stop, State1 #state { instructions = InstrQ1 }}; blocked -> @@ -407,20 +414,38 @@ process_instructions(State = #state { instructions = InstrQ }) -> end end. -process_instruction({publish, Deliver, Guid, MsgProps, ChPid}, +process_instruction(Flush, {publish, Deliver, Guid, MsgProps, ChPid} = Instr, State = #state { q = Q, sender_queues = SQ, backing_queue = BQ, backing_queue_state = BQS, guid_ack = GA, - guid_to_channel = GTC }) -> + guid_to_channel = GTC, + fakes = Fakes }) -> case dict:find(ChPid, SQ) of error -> - blocked; + case Flush of + true -> MQ = queue:from_list([fake_delivery(Q, Guid, ChPid)]), + State1 = State #state { + sender_queues = dict:store(ChPid, MQ, SQ), + fakes = sets:add_element(Guid, Fakes) }, + process_instruction(Flush, Instr, State1); + false -> blocked + end; {ok, MQ} -> case queue:out(MQ) of {empty, _MQ} -> - blocked; + case Flush of + true -> + MQ1 = queue:in_r(fake_delivery(Q, Guid, ChPid), MQ), + SQ1 = dict:store(ChPid, MQ1, SQ), + State1 = State #state { + sender_queues = SQ1, + fakes = sets:add_element(Guid, Fakes) }, + process_instruction(Flush, Instr, State1); + false -> + blocked + end; {{value, Delivery = #delivery { message = Msg = #basic_message { guid = Guid } }}, MQ1} -> @@ -449,28 +474,41 @@ process_instruction({publish, Deliver, Guid, MsgProps, ChPid}, guid_to_channel = GTC2 } end}; {{value, #delivery {}}, _MQ1} -> - %% throw away the instruction: we'll never receive - %% the message to which it corresponds. - {processed, State} + MQ1 = queue:in_r(fake_delivery(Q, Guid, ChPid), MQ), + State1 = State #state { + sender_queues = dict:store(ChPid, MQ1, SQ), + fakes = sets:add_element(Guid, Fakes) }, + process_instruction(Flush, Instr, State1) end end; -process_instruction({set_length, Length}, +process_instruction(_Flush, {set_length, Length}, State = #state { backing_queue = BQ, - backing_queue_state = BQS }) -> + backing_queue_state = BQS, + fakes = Fakes }) -> QLen = BQ:len(BQS), ToDrop = QLen - Length, {processed, case ToDrop > 0 of - true -> BQS1 = lists:foldl( - fun (const, BQSN) -> BQ:fetch(false, BQSN) end, - BQS, lists:duplicate(ToDrop, const)), - State #state { backing_queue_state = BQS1 }; - false -> State + true -> + {Guids, BQS1} = + lists:foldl( + fun (const, {GuidsN, BQSN}) -> + {{#basic_message { guid = Guid }, _IsDelivered, + _AckTag, _Remaining}, BQSN1} = + BQ:fetch(false, BQSN), + {[Guid | GuidsN], BQSN1} + end, BQS, lists:duplicate(ToDrop, const)), + Fakes1 = sets:difference(Fakes, sets:from_list(Guids)), + State #state { backing_queue_state = BQS1, + fakes = Fakes1 }; + false -> + State end}; -process_instruction({fetch, AckRequired, Guid, Remaining}, +process_instruction(_Flush, {fetch, AckRequired, Guid, Remaining}, State = #state { backing_queue = BQ, backing_queue_state = BQS, - guid_ack = GA }) -> + guid_ack = GA, + fakes = Fakes }) -> QLen = BQ:len(BQS), {processed, case QLen - 1 of @@ -481,22 +519,28 @@ process_instruction({fetch, AckRequired, Guid, Remaining}, true -> dict:store(Guid, AckTag, GA); false -> GA end, + Fakes1 = sets:del_element(Guid, Fakes), State #state { backing_queue_state = BQS1, - guid_ack = GA1 }; + guid_ack = GA1, + fakes = Fakes1 }; Other when Other < Remaining -> %% we must be shorter than the master + false = sets:is_element(Guid, Fakes), %% ASSERTION State end}; -process_instruction({ack, Guids}, +process_instruction(_Flush, {ack, Guids}, State = #state { backing_queue = BQ, backing_queue_state = BQS, - guid_ack = GA }) -> + guid_ack = GA, + fakes = Fakes }) -> {AckTags, GA1} = guids_to_acktags(Guids, GA), {Guids1, BQS1} = BQ:ack(AckTags, BQS), [] = Guids1 -- Guids, %% ASSERTION + Fakes1 = sets:difference(Fakes, sets:from_list(Guids)), {processed, State #state { guid_ack = GA1, - backing_queue_state = BQS1 }}; -process_instruction({requeue, MsgPropsFun, Guids}, + backing_queue_state = BQS1, + fakes = Fakes1 }}; +process_instruction(_Flush, {requeue, MsgPropsFun, Guids}, State = #state { backing_queue = BQ, backing_queue_state = BQS, guid_ack = GA }) -> @@ -515,7 +559,7 @@ process_instruction({requeue, MsgPropsFun, Guids}, State #state { guid_ack = dict:new(), backing_queue_state = BQS2 } end}; -process_instruction(delete_and_terminate, +process_instruction(_Flush, delete_and_terminate, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:delete_and_terminate(BQS), @@ -534,3 +578,14 @@ guids_to_acktags(Guids, GA) -> ack_all(BQ, GA, BQS) -> BQ:ack([AckTag || {_Guid, AckTag} <- dict:to_list(GA)], BQS). + +fake_delivery(#amqqueue { name = QueueName }, Guid, ChPid) -> + ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>), + Msg = (rabbit_basic:message(ExchangeName, <<>>, #'P_basic'{}, <<>>)) + #basic_message { guid = Guid }, + #delivery { mandatory = false, + immediate = false, + txn = none, + sender = ChPid, + message = Msg, + msg_seq_no = undefined }. |
