diff options
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 115 |
2 files changed, 65 insertions, 58 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 477449e3e9..094b83c973 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -145,7 +145,7 @@ monitor_wait([MRef | MRefs]) -> purge(State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> - ok = gm:broadcast(GM, {set_length, 0, false}), + ok = gm:broadcast(GM, {set_length, 0, BQ:len(BQS), false}), {Count, BQS1} = BQ:purge(BQS), {Count, State #state { backing_queue_state = BQS1, set_delivered = 0 }}. @@ -187,8 +187,8 @@ dropwhile(Pred, AckRequired, Len = BQ:len(BQS), {Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS), Len1 = BQ:len(BQS1), - ok = gm:broadcast(GM, {set_length, Len1, AckRequired}), Dropped = Len - Len1, + ok = gm:broadcast(GM, {set_length, Len1, Dropped, AckRequired}), SetDelivered1 = lists:max([0, SetDelivered - Dropped]), {Next, Msgs, State #state { backing_queue_state = BQS1, set_delivered = SetDelivered1 } }. @@ -251,7 +251,7 @@ ack(AckTags, State = #state { gm = GM, {MsgIds, BQS1} = BQ:ack(AckTags, BQS), case MsgIds of [] -> ok; - _ -> ok = gm:broadcast(GM, {ack, MsgIds}) + _ -> ok = gm:broadcast(GM, {ack, MsgIds, BQ:len(BQS1)}) end, AM1 = lists:foldl(fun dict:erase/2, AM, AckTags), {MsgIds, State #state { backing_queue_state = BQS1, @@ -265,7 +265,7 @@ requeue(AckTags, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> {MsgIds, BQS1} = BQ:requeue(AckTags, BQS), - ok = gm:broadcast(GM, {requeue, MsgIds}), + ok = gm:broadcast(GM, {requeue, MsgIds, BQ:len(BQS1)}), {MsgIds, State #state { backing_queue_state = BQS1 }}. len(#state { backing_queue = BQ, backing_queue_state = BQS }) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index ef43d96e5f..2c60acf061 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -77,7 +77,8 @@ msg_id_status, known_senders, - synchronised + synchronised, + external_pending }). start_link(Q) -> @@ -131,7 +132,8 @@ init(#amqqueue { name = QueueName } = Q) -> msg_id_status = dict:new(), known_senders = pmon:new(), - synchronised = false + synchronised = false, + external_pending = 0 }, rabbit_event:notify(queue_slave_created, infos(?CREATION_EVENT_KEYS, State)), @@ -809,71 +811,67 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }}, {ok, State1 #state { sender_queues = SQ1, msg_id_status = MS1, backing_queue_state = BQS1 }}; -process_instruction({set_length, Length, AckRequired}, +process_instruction({set_length, Length, Dropped, AckRequired}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> QLen = BQ:len(BQS), ToDrop = QLen - Length, {ok, - case ToDrop >= 0 of - true -> - State1 = - lists:foldl( - fun (const, StateN = #state {backing_queue_state = BQSN}) -> - {{#basic_message{id = MsgId}, _IsDelivered, AckTag, - _Remaining}, BQSN1} = BQ:fetch(AckRequired, BQSN), - maybe_store_ack( - AckRequired, MsgId, AckTag, - StateN #state { backing_queue_state = BQSN1 }) - end, State, lists:duplicate(ToDrop, const)), - set_synchronised(true, State1); - false -> - State - end}; + set_synchronised( + Length, + case ToDrop >= 0 of + true -> + State1 = + lists:foldl( + fun (const, StateN = #state{backing_queue_state = BQSN}) -> + {{#basic_message{id = MsgId}, _, AckTag, _}, + BQSN1} = BQ:fetch(AckRequired, BQSN), + maybe_store_ack( + AckRequired, MsgId, AckTag, + StateN #state { backing_queue_state = BQSN1 }) + end, State, lists:duplicate(ToDrop, const)), + case AckRequired of + true -> set_synchronised(ToDrop, Dropped, Length, State1); + false -> State1 + end; + false -> + State + end)}; process_instruction({fetch, AckRequired, MsgId, Remaining}, State = #state { backing_queue = BQ, - backing_queue_state = BQS }) -> + backing_queue_state = BQS, + external_pending = ExtPending }) -> QLen = BQ:len(BQS), - {ok, case QLen - 1 of - Remaining -> + {ok, case {QLen - 1, AckRequired} of + {Remaining, _} -> {{#basic_message{id = MsgId}, _IsDelivered, AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), maybe_store_ack(AckRequired, MsgId, AckTag, State #state { backing_queue_state = BQS1 }); - Other when Other + 1 =:= Remaining -> - set_synchronised(true, State); - Other when Other < Remaining -> - %% we must be shorter than the master - State + {_, false} when QLen =< Remaining -> + set_synchronised(Remaining, State); + {_, true} when QLen =< Remaining -> + State #state { external_pending = ExtPending + 1} end}; -process_instruction({ack, MsgIds}, +process_instruction({ack, MsgIds, Length}, State = #state { backing_queue = BQ, backing_queue_state = BQS, msg_id_ack = MA }) -> {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), {MsgIds1, BQS1} = BQ:ack(AckTags, BQS), [] = MsgIds1 -- MsgIds, %% ASSERTION - {ok, State #state { msg_id_ack = MA1, - backing_queue_state = BQS1 }}; -process_instruction({requeue, MsgIds}, + {ok, set_synchronised(length(AckTags), length(MsgIds), Length, + State #state { msg_id_ack = MA1, + backing_queue_state = BQS1 })}; +process_instruction({requeue, MsgIds, Length}, State = #state { backing_queue = BQ, backing_queue_state = BQS, msg_id_ack = MA }) -> {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), - {ok, case length(AckTags) =:= length(MsgIds) of - true -> - {MsgIds, BQS1} = BQ:requeue(AckTags, BQS), - State #state { msg_id_ack = MA1, - backing_queue_state = BQS1 }; - false -> - %% The only thing we can safely do is nuke out our BQ - %% and MA. The interaction between this and confirms - %% doesn't really bear thinking about... - {_Count, BQS1} = BQ:purge(BQS), - {_MsgIds, BQS2} = ack_all(BQ, MA, BQS1), - State #state { msg_id_ack = dict:new(), - backing_queue_state = BQS2 } - end}; + {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), + {ok, set_synchronised(length(AckTags), length(MsgIds), Length, + State #state { msg_id_ack = MA1, + backing_queue_state = BQS1 })}; process_instruction({sender_death, ChPid}, State = #state { sender_queues = SQ, msg_id_status = MS, @@ -891,10 +889,8 @@ process_instruction({sender_death, ChPid}, msg_id_status = MS1, known_senders = pmon:demonitor(ChPid, KS) } end}; -process_instruction({length, Length}, - State = #state { backing_queue = BQ, - backing_queue_state = BQS }) -> - {ok, set_synchronised(Length =:= BQ:len(BQS), State)}; +process_instruction({length, Length}, State) -> + {ok, set_synchronised(Length, State)}; process_instruction({delete_and_terminate, Reason}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> @@ -913,9 +909,6 @@ msg_ids_to_acktags(MsgIds, MA) -> end, {[], MA}, MsgIds), {lists:reverse(AckTags), MA1}. -ack_all(BQ, MA, BQS) -> - BQ:ack([AckTag || {_MsgId, {_Num, AckTag}} <- dict:to_list(MA)], BQS). - maybe_store_ack(false, _MsgId, _AckTag, State) -> State; maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, @@ -923,9 +916,23 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA), ack_num = Num + 1 }. +set_synchronised(LocalPending, RemotePending, Length, + State = #state { backing_queue = BQ, + backing_queue_state = BQS, + external_pending = ExtPending }) -> + ExtPending1 = ExtPending - (RemotePending - LocalPending), + State1 = State #state { external_pending = ExtPending1 }, + case ExtPending1 =:= 0 andalso Length =:= BQ:len(BQS) of + true -> set_synchronised1(true, State1); + false when ExtPending1 >= 0 -> set_synchronised1(false, State1) + end. + +set_synchronised(Length, State) -> + set_synchronised(0, 0, Length, State). + %% We intentionally leave out the head where a slave becomes %% unsynchronised: we assert that can never happen. -set_synchronised(true, State = #state { q = #amqqueue { name = QName }, +set_synchronised1(true, State = #state { q = #amqqueue { name = QName }, synchronised = false }) -> Self = self(), rabbit_misc:execute_mnesia_transaction( @@ -939,7 +946,7 @@ set_synchronised(true, State = #state { q = #amqqueue { name = QName }, end end), State #state { synchronised = true }; -set_synchronised(true, State) -> +set_synchronised1(true, State) -> State; -set_synchronised(false, State = #state { synchronised = false }) -> +set_synchronised1(false, State = #state { synchronised = false }) -> State. |
