summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-09-26 11:13:50 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-09-26 11:13:50 +0100
commitb8bd0e50ccd97d1e115ada92f2ca50a8e4432c9f (patch)
tree6bc193a3320e26fb70f5ddc9ecc0c8d0c389f06f /src
parent1e23216be1a04e83e512d21760d5edf3e3d72e36 (diff)
parentb7c77eb3de9c10e112ca68643e74408b8227d70a (diff)
downloadrabbitmq-server-git-b8bd0e50ccd97d1e115ada92f2ca50a8e4432c9f.tar.gz
Merge bug25182
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_mirror_queue_slave.erl98
1 files changed, 45 insertions, 53 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 1f6567e0e4..039b274908 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -800,27 +800,23 @@ process_instruction({drop, Length, Dropped, AckRequired},
end, State, lists:duplicate(ToDrop, const)),
{ok, case AckRequired of
true -> State1;
- false -> set_synchronised(ToDrop - Dropped, State1)
+ false -> update_delta(ToDrop - Dropped, State1)
end};
process_instruction({fetch, AckRequired, MsgId, Remaining},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
- {State1, Delta} =
- case QLen - 1 of
- Remaining ->
- {{#basic_message{id = MsgId}, _IsDelivered,
- AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
- {maybe_store_ack(AckRequired, MsgId, AckTag,
- State #state { backing_queue_state = BQS1 }),
- 0};
+ {ok, case QLen - 1 of
+ Remaining ->
+ {{#basic_message{id = MsgId}, _IsDelivered,
+ AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
+ maybe_store_ack(AckRequired, MsgId, AckTag,
+ State #state { backing_queue_state = BQS1 });
+ _ when QLen =< Remaining andalso AckRequired ->
+ State;
_ when QLen =< Remaining ->
- {State, case AckRequired of
- true -> 0;
- false -> -1
- end}
- end,
- {ok, set_synchronised(Delta, State1)};
+ update_delta(-1, State)
+ end};
process_instruction({ack, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -828,9 +824,9 @@ process_instruction({ack, MsgIds},
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
{MsgIds1, BQS1} = BQ:ack(AckTags, BQS),
[] = MsgIds1 -- MsgIds, %% ASSERTION
- {ok, set_synchronised(length(MsgIds1) - length(MsgIds),
- State #state { msg_id_ack = MA1,
- backing_queue_state = BQS1 })};
+ {ok, update_delta(length(MsgIds1) - length(MsgIds),
+ State #state { msg_id_ack = MA1,
+ backing_queue_state = BQS1 })};
process_instruction({requeue, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -859,8 +855,8 @@ process_instruction({sender_death, ChPid},
process_instruction({depth, Depth},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
- {ok, set_synchronised(
- 0, true, State #state { depth_delta = Depth - BQ:depth(BQS) })};
+ {ok, set_delta(Depth - BQ:depth(BQS), State)};
+
process_instruction({delete_and_terminate, Reason},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -886,38 +882,34 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA),
ack_num = Num + 1 }.
-set_synchronised(Delta, State) ->
- set_synchronised(Delta, false, State).
+set_delta(0, State = #state { depth_delta = undefined }) ->
+ ok = record_synchronised(State#state.q),
+ State #state { depth_delta = 0 };
+set_delta(NewDelta, State = #state { depth_delta = undefined }) ->
+ true = NewDelta > 0, %% assertion
+ State #state { depth_delta = NewDelta };
+set_delta(NewDelta, State = #state { depth_delta = Delta }) ->
+ update_delta(NewDelta - Delta, State).
-set_synchronised(_Delta, _AddAnyway,
- State = #state { depth_delta = undefined }) ->
+update_delta(_DeltaChange, State = #state { depth_delta = undefined }) ->
State;
-set_synchronised(Delta, AddAnyway,
- State = #state { depth_delta = DepthDelta,
- q = #amqqueue { name = QName }}) ->
- DepthDelta1 = DepthDelta + Delta,
- %% We intentionally leave out the head where a slave becomes
- %% unsynchronised: we assert that can never happen.
- %% The `AddAnyway' param is there since in the `depth' instruction we
- %% receive the master depth for the first time, and we want to set the sync
- %% state anyway if we are synced.
- case DepthDelta1 =:= 0 of
- true when not (DepthDelta =:= 0) orelse AddAnyway ->
- Self = self(),
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:read({rabbit_queue, QName}) of
- [] ->
- ok;
- [Q1 = #amqqueue{sync_slave_pids = SSPids}] ->
- %% We might be there already, in the `AddAnyway'
- %% case
- SSPids1 = SSPids -- [Self],
- rabbit_mirror_queue_misc:store_updated_slaves(
- Q1#amqqueue{sync_slave_pids = [Self | SSPids1]})
- end
- end);
- _ when DepthDelta1 >= 0 ->
- ok
- end,
- State #state { depth_delta = DepthDelta1 }.
+update_delta( DeltaChange, State = #state { depth_delta = 0 }) ->
+ 0 = DeltaChange, %% assertion: we cannot become unsync'ed
+ State;
+update_delta( DeltaChange, State = #state { depth_delta = Delta }) ->
+ true = DeltaChange =< 0, %% assertion: we cannot become 'less' sync'ed
+ set_delta(Delta + DeltaChange, State #state { depth_delta = undefined }).
+
+record_synchronised(#amqqueue { name = QName }) ->
+ Self = self(),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:read({rabbit_queue, QName}) of
+ [] ->
+ ok;
+ [Q = #amqqueue { sync_slave_pids = SSPids }] ->
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q #amqqueue { sync_slave_pids = [Self | SSPids] }),
+ ok
+ end
+ end).