diff options
| author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-08-31 18:42:49 +0100 |
|---|---|---|
| committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-08-31 18:42:49 +0100 |
| commit | 3bf9ed90d6ccaa806008c8bbfd1380dcbdff304a (patch) | |
| tree | 3b30ca4fd43d1c4bd76b3cb60fbb789519ce3982 | |
| parent | e2733fe49271fd4ee6079e272a3309cbdf8e99eb (diff) | |
| download | rabbitmq-server-git-3bf9ed90d6ccaa806008c8bbfd1380dcbdff304a.tar.gz | |
`external_pending' => `unknown_pending'
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 15 |
1 files changed, 9 insertions, 6 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index d79067bec6..3423295a1a 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -78,7 +78,10 @@ known_senders, synchronised, - external_pending + %% Records the pending acks on the master for messages that we + %% do not have. This is necessary to accurately determine + %% whether the slave is synchronised or not. + unknown_pending }). start_link(Q) -> @@ -133,7 +136,7 @@ init(#amqqueue { name = QueueName } = Q) -> known_senders = pmon:new(), synchronised = false, - external_pending = undefined + unknown_pending = undefined }, rabbit_event:notify(queue_slave_created, infos(?CREATION_EVENT_KEYS, State)), @@ -884,7 +887,7 @@ process_instruction({sender_death, ChPid}, end}; process_instruction({length, Length, ExtPending}, State) -> {ok, set_synchronised(Length, - State #state { external_pending = ExtPending })}; + State #state { unknown_pending = ExtPending })}; process_instruction({delete_and_terminate, Reason}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> @@ -910,14 +913,14 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA), ack_num = Num + 1 }. -set_synchronised(_, _, State = #state { external_pending = undefined }) -> +set_synchronised(_, _, State = #state { unknown_pending = undefined }) -> State; set_synchronised(PendingDelta, Length, State = #state { backing_queue = BQ, backing_queue_state = BQS, - external_pending = ExtPending }) -> + unknown_pending = ExtPending }) -> ExtPending1 = ExtPending + PendingDelta, - State1 = State #state { external_pending = ExtPending1 }, + State1 = State #state { unknown_pending = ExtPending1 }, case ExtPending1 =:= 0 andalso Length =:= BQ:len(BQS) of true -> set_synchronised1(true, State1); false when ExtPending1 >= 0 -> set_synchronised1(false, State1) |
