summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_mirror_queue_slave.erl15
1 files changed, 9 insertions, 6 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index d79067bec6..3423295a1a 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -78,7 +78,10 @@
known_senders,
synchronised,
- external_pending
+ %% Records the pending acks on the master for messages that we
+ %% do not have. This is necessary to accurately determine
+ %% whether the slave is synchronised or not.
+ unknown_pending
}).
start_link(Q) ->
@@ -133,7 +136,7 @@ init(#amqqueue { name = QueueName } = Q) ->
known_senders = pmon:new(),
synchronised = false,
- external_pending = undefined
+ unknown_pending = undefined
},
rabbit_event:notify(queue_slave_created,
infos(?CREATION_EVENT_KEYS, State)),
@@ -884,7 +887,7 @@ process_instruction({sender_death, ChPid},
end};
process_instruction({length, Length, ExtPending}, State) ->
{ok, set_synchronised(Length,
- State #state { external_pending = ExtPending })};
+ State #state { unknown_pending = ExtPending })};
process_instruction({delete_and_terminate, Reason},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -910,14 +913,14 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA),
ack_num = Num + 1 }.
-set_synchronised(_, _, State = #state { external_pending = undefined }) ->
+set_synchronised(_, _, State = #state { unknown_pending = undefined }) ->
State;
set_synchronised(PendingDelta, Length,
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
- external_pending = ExtPending }) ->
+ unknown_pending = ExtPending }) ->
ExtPending1 = ExtPending + PendingDelta,
- State1 = State #state { external_pending = ExtPending1 },
+ State1 = State #state { unknown_pending = ExtPending1 },
case ExtPending1 =:= 0 andalso Length =:= BQ:len(BQS) of
true -> set_synchronised1(true, State1);
false when ExtPending1 >= 0 -> set_synchronised1(false, State1)