diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-09-26 11:13:50 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-09-26 11:13:50 +0100 |
| commit | b8bd0e50ccd97d1e115ada92f2ca50a8e4432c9f (patch) | |
| tree | 6bc193a3320e26fb70f5ddc9ecc0c8d0c389f06f | |
| parent | 1e23216be1a04e83e512d21760d5edf3e3d72e36 (diff) | |
| parent | b7c77eb3de9c10e112ca68643e74408b8227d70a (diff) | |
| download | rabbitmq-server-git-b8bd0e50ccd97d1e115ada92f2ca50a8e4432c9f.tar.gz | |
Merge bug25182
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 98 |
1 files changed, 45 insertions, 53 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 1f6567e0e4..039b274908 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -800,27 +800,23 @@ process_instruction({drop, Length, Dropped, AckRequired}, end, State, lists:duplicate(ToDrop, const)), {ok, case AckRequired of true -> State1; - false -> set_synchronised(ToDrop - Dropped, State1) + false -> update_delta(ToDrop - Dropped, State1) end}; process_instruction({fetch, AckRequired, MsgId, Remaining}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> QLen = BQ:len(BQS), - {State1, Delta} = - case QLen - 1 of - Remaining -> - {{#basic_message{id = MsgId}, _IsDelivered, - AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), - {maybe_store_ack(AckRequired, MsgId, AckTag, - State #state { backing_queue_state = BQS1 }), - 0}; + {ok, case QLen - 1 of + Remaining -> + {{#basic_message{id = MsgId}, _IsDelivered, + AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), + maybe_store_ack(AckRequired, MsgId, AckTag, + State #state { backing_queue_state = BQS1 }); + _ when QLen =< Remaining andalso AckRequired -> + State; _ when QLen =< Remaining -> - {State, case AckRequired of - true -> 0; - false -> -1 - end} - end, - {ok, set_synchronised(Delta, State1)}; + update_delta(-1, State) + end}; process_instruction({ack, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, @@ -828,9 +824,9 @@ process_instruction({ack, MsgIds}, {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), {MsgIds1, BQS1} = BQ:ack(AckTags, BQS), [] = MsgIds1 -- MsgIds, %% ASSERTION - {ok, set_synchronised(length(MsgIds1) - length(MsgIds), - State #state { msg_id_ack = MA1, - backing_queue_state = BQS1 })}; + {ok, update_delta(length(MsgIds1) - length(MsgIds), + State #state { msg_id_ack = MA1, + backing_queue_state = BQS1 })}; process_instruction({requeue, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, @@ -859,8 +855,8 @@ process_instruction({sender_death, ChPid}, process_instruction({depth, Depth}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> - {ok, set_synchronised( - 0, true, State #state { depth_delta = Depth - BQ:depth(BQS) })}; + {ok, set_delta(Depth - BQ:depth(BQS), State)}; + process_instruction({delete_and_terminate, Reason}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> @@ -886,38 +882,34 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA), ack_num = Num + 1 }. -set_synchronised(Delta, State) -> - set_synchronised(Delta, false, State). +set_delta(0, State = #state { depth_delta = undefined }) -> + ok = record_synchronised(State#state.q), + State #state { depth_delta = 0 }; +set_delta(NewDelta, State = #state { depth_delta = undefined }) -> + true = NewDelta > 0, %% assertion + State #state { depth_delta = NewDelta }; +set_delta(NewDelta, State = #state { depth_delta = Delta }) -> + update_delta(NewDelta - Delta, State). -set_synchronised(_Delta, _AddAnyway, - State = #state { depth_delta = undefined }) -> +update_delta(_DeltaChange, State = #state { depth_delta = undefined }) -> State; -set_synchronised(Delta, AddAnyway, - State = #state { depth_delta = DepthDelta, - q = #amqqueue { name = QName }}) -> - DepthDelta1 = DepthDelta + Delta, - %% We intentionally leave out the head where a slave becomes - %% unsynchronised: we assert that can never happen. - %% The `AddAnyway' param is there since in the `depth' instruction we - %% receive the master depth for the first time, and we want to set the sync - %% state anyway if we are synced. - case DepthDelta1 =:= 0 of - true when not (DepthDelta =:= 0) orelse AddAnyway -> - Self = self(), - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:read({rabbit_queue, QName}) of - [] -> - ok; - [Q1 = #amqqueue{sync_slave_pids = SSPids}] -> - %% We might be there already, in the `AddAnyway' - %% case - SSPids1 = SSPids -- [Self], - rabbit_mirror_queue_misc:store_updated_slaves( - Q1#amqqueue{sync_slave_pids = [Self | SSPids1]}) - end - end); - _ when DepthDelta1 >= 0 -> - ok - end, - State #state { depth_delta = DepthDelta1 }. +update_delta( DeltaChange, State = #state { depth_delta = 0 }) -> + 0 = DeltaChange, %% assertion: we cannot become unsync'ed + State; +update_delta( DeltaChange, State = #state { depth_delta = Delta }) -> + true = DeltaChange =< 0, %% assertion: we cannot become 'less' sync'ed + set_delta(Delta + DeltaChange, State #state { depth_delta = undefined }). + +record_synchronised(#amqqueue { name = QName }) -> + Self = self(), + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:read({rabbit_queue, QName}) of + [] -> + ok; + [Q = #amqqueue { sync_slave_pids = SSPids }] -> + rabbit_mirror_queue_misc:store_updated_slaves( + Q #amqqueue { sync_slave_pids = [Self | SSPids] }), + ok + end + end). |
